Fix return the earliest position when query position by timestamp
This commit is contained in:
parent
c1f21e9862
commit
a63291e2c9
132
0017-return-earliest-position.patch
Normal file
132
0017-return-earliest-position.patch
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
|
||||||
|
index 46ca0f1400..0653e40cb3 100644
|
||||||
|
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
|
||||||
|
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
|
||||||
|
@@ -532,6 +532,23 @@ public interface ManagedCursor {
|
||||||
|
void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
|
||||||
|
FindEntryCallback callback, Object ctx);
|
||||||
|
|
||||||
|
+ /**
|
||||||
|
+ * Find the newest entry that matches the given predicate.
|
||||||
|
+ *
|
||||||
|
+ * @param constraint
|
||||||
|
+ * search only active entries or all entries
|
||||||
|
+ * @param condition
|
||||||
|
+ * predicate that reads an entry an applies a condition
|
||||||
|
+ * @param callback
|
||||||
|
+ * callback object returning the resultant position
|
||||||
|
+ * @param ctx
|
||||||
|
+ * opaque context
|
||||||
|
+ * @param isFindFromLedger
|
||||||
|
+ * find the newest entry from ledger
|
||||||
|
+ */
|
||||||
|
+ void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
|
||||||
|
+ FindEntryCallback callback, Object ctx, boolean isFindFromLedger);
|
||||||
|
+
|
||||||
|
/**
|
||||||
|
* reset the cursor to specified position to enable replay of messages.
|
||||||
|
*
|
||||||
|
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
|
||||||
|
index 011d3df77f..6045412398 100644
|
||||||
|
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
|
||||||
|
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
|
||||||
|
@@ -1078,6 +1078,12 @@ public class ManagedCursorImpl implements ManagedCursor {
|
||||||
|
@Override
|
||||||
|
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
|
||||||
|
FindEntryCallback callback, Object ctx) {
|
||||||
|
+ asyncFindNewestMatching(constraint, condition, callback, ctx, false);
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ @Override
|
||||||
|
+ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
|
||||||
|
+ FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
|
||||||
|
OpFindNewest op;
|
||||||
|
PositionImpl startPosition = null;
|
||||||
|
long max = 0;
|
||||||
|
@@ -1099,7 +1105,11 @@ public class ManagedCursorImpl implements ManagedCursor {
|
||||||
|
Optional.empty(), ctx);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
- op = new OpFindNewest(this, startPosition, condition, max, callback, ctx);
|
||||||
|
+ if (isFindFromLedger) {
|
||||||
|
+ op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx);
|
||||||
|
+ } else {
|
||||||
|
+ op = new OpFindNewest(this, startPosition, condition, max, callback, ctx);
|
||||||
|
+ }
|
||||||
|
op.find();
|
||||||
|
}
|
||||||
|
|
||||||
|
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
|
||||||
|
index 18fe4dba31..675f28e2d2 100644
|
||||||
|
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
|
||||||
|
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
|
||||||
|
@@ -241,6 +241,11 @@ public class ManagedCursorContainerTest {
|
||||||
|
AsyncCallbacks.FindEntryCallback callback, Object ctx) {
|
||||||
|
}
|
||||||
|
|
||||||
|
+ @Override
|
||||||
|
+ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
|
||||||
|
+ AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
@Override
|
||||||
|
public void asyncResetCursor(final Position position, boolean forceReset,
|
||||||
|
AsyncCallbacks.ResetCursorCallback callback) {
|
||||||
|
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
|
||||||
|
index 825bc546f4..838771e6d3 100644
|
||||||
|
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
|
||||||
|
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
|
||||||
|
@@ -71,7 +71,7 @@ public class PersistentMessageFinder implements AsyncCallbacks.FindEntryCallback
|
||||||
|
entry.release();
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
- }, this, callback);
|
||||||
|
+ }, this, callback, true);
|
||||||
|
} else {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("[{}][{}] Ignore message position find scheduled task, last find is still running", topicName,
|
||||||
|
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
|
||||||
|
index e70d0dc2b5..da78a0411d 100644
|
||||||
|
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
|
||||||
|
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
|
||||||
|
@@ -243,6 +243,40 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
|
||||||
|
factory.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
+ @Test
|
||||||
|
+ void testPersistentMessageFinderWhenLastMessageDelete() throws Exception {
|
||||||
|
+ final String ledgerAndCursorName = "testPersistentMessageFinderWhenLastMessageDelete";
|
||||||
|
+
|
||||||
|
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
|
||||||
|
+ config.setRetentionSizeInMB(10);
|
||||||
|
+ config.setMaxEntriesPerLedger(10);
|
||||||
|
+ config.setRetentionTime(1, TimeUnit.HOURS);
|
||||||
|
+ ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
|
||||||
|
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
|
||||||
|
+
|
||||||
|
+ ledger.addEntry(createMessageWrittenToLedger("msg1"));
|
||||||
|
+ ledger.addEntry(createMessageWrittenToLedger("msg2"));
|
||||||
|
+ ledger.addEntry(createMessageWrittenToLedger("msg3"));
|
||||||
|
+ Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("last-message"));
|
||||||
|
+
|
||||||
|
+ long endTimestamp = System.currentTimeMillis() + 1000;
|
||||||
|
+
|
||||||
|
+ Result result = new Result();
|
||||||
|
+ // delete last position message
|
||||||
|
+ cursor.delete(lastPosition);
|
||||||
|
+ CompletableFuture<Void> future = findMessage(result, cursor, endTimestamp);
|
||||||
|
+ future.get();
|
||||||
|
+ assertNull(result.exception);
|
||||||
|
+ assertNotEquals(result.position, null);
|
||||||
|
+ assertEquals(result.position, lastPosition);
|
||||||
|
+
|
||||||
|
+ result.reset();
|
||||||
|
+ cursor.close();
|
||||||
|
+ ledger.close();
|
||||||
|
+ factory.shutdown();
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+
|
||||||
|
@Test
|
||||||
|
void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception {
|
||||||
|
|
||||||
@ -1,6 +1,6 @@
|
|||||||
%define debug_package %{nil}
|
%define debug_package %{nil}
|
||||||
%define pulsar_ver 2.10.4
|
%define pulsar_ver 2.10.4
|
||||||
%define pkg_ver 16
|
%define pkg_ver 17
|
||||||
%define _prefix /opt/pulsar
|
%define _prefix /opt/pulsar
|
||||||
Summary: Cloud-Native, Distributed Messaging and Streaming
|
Summary: Cloud-Native, Distributed Messaging and Streaming
|
||||||
Name: pulsar
|
Name: pulsar
|
||||||
@ -26,6 +26,7 @@ Patch0013: 0013-fix-deadlock.patch
|
|||||||
Patch0014: 0014-CVE-2023-32732.patch
|
Patch0014: 0014-CVE-2023-32732.patch
|
||||||
Patch0015: 0015-fix-no-messages.patch
|
Patch0015: 0015-fix-no-messages.patch
|
||||||
Patch0016: 0016-handle-exception.patch
|
Patch0016: 0016-handle-exception.patch
|
||||||
|
Patch0017: 0017-return-earliest-position.patch
|
||||||
BuildRoot: /root/rpmbuild/BUILDROOT/
|
BuildRoot: /root/rpmbuild/BUILDROOT/
|
||||||
BuildRequires: java-1.8.0-openjdk-devel,maven,systemd
|
BuildRequires: java-1.8.0-openjdk-devel,maven,systemd
|
||||||
Requires: java-1.8.0-openjdk,systemd
|
Requires: java-1.8.0-openjdk,systemd
|
||||||
@ -54,6 +55,7 @@ Pulsar is a distributed pub-sub messaging platform with a very flexible messagin
|
|||||||
%patch0014 -p1
|
%patch0014 -p1
|
||||||
%patch0015 -p1
|
%patch0015 -p1
|
||||||
%patch0016 -p1
|
%patch0016 -p1
|
||||||
|
%patch0017 -p1
|
||||||
|
|
||||||
%build
|
%build
|
||||||
mvn clean install -Pcore-modules,-main -DskipTests
|
mvn clean install -Pcore-modules,-main -DskipTests
|
||||||
@ -79,6 +81,8 @@ getent passwd pulsar >/dev/null || useradd -r -g pulsar -d / -s /sbin/nologin pu
|
|||||||
exit 0
|
exit 0
|
||||||
|
|
||||||
%changelog
|
%changelog
|
||||||
|
* Fri Dec 8 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-17
|
||||||
|
- Fix return the earliest position when query position by timestamp.
|
||||||
* Fri Dec 8 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-16
|
* Fri Dec 8 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-16
|
||||||
- Only handle exception when there has
|
- Only handle exception when there has
|
||||||
* Thu Dec 7 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-15
|
* Thu Dec 7 2023 Dapeng Sun <sundapeng@cmss.chinamobile.com> - 2.10.4-15
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user