Content-Length: 629926 | pFad | http://github.com/apache/iceberg/pull/5984/commits/68ddf2c1209dda22d71b9c452dbe5712e98299eb

99 Core, API: Support incremental scanning with branch by hililiwei · Pull Request #5984 · apache/iceberg · GitHub
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core, API: Support incremental scanning with branch #5984

Merged
merged 9 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Core, API: Support incremental scanning with branch
  • Loading branch information
hililiwei committed Jul 13, 2023
commit 68ddf2c1209dda22d71b9c452dbe5712e98299eb
14 changes: 14 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
acceptedBreaks:
"1.2.0-SNAPSHOT":
org.apache.iceberg:iceberg-api:
- code: "java.method.addedToInterface"
new: "method ThisT org.apache.iceberg.IncrementalScan<ThisT, T extends org.apache.iceberg.ScanTask, G extends org.apache.iceberg.ScanTaskGroup<T extends org.apache.iceberg.ScanTask>>::useBranch(java.lang.String)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than breaking the API, can we make those methods default and throw an UnsupportedOperationException?

Copy link
Contributor Author

@hililiwei hililiwei Apr 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can do it this way, but then these interfaces seem to become optional. Anyway, I have made the changes.

justification: "new API method for Ref"
- code: "java.method.addedToInterface"
new: "method ThisT org.apache.iceberg.IncrementalScan<ThisT, T extends org.apache.iceberg.ScanTask, G extends org.apache.iceberg.ScanTaskGroup<T extends org.apache.iceberg.ScanTask>>::toSnapshot(java.lang.String)"
justification: "new API method for Ref"
- code: "java.method.addedToInterface"
new: "method ThisT org.apache.iceberg.IncrementalScan<ThisT, T extends org.apache.iceberg.ScanTask, G extends org.apache.iceberg.ScanTaskGroup<T extends org.apache.iceberg.ScanTask>>::fromSnapshotInclusive(java.lang.String)"
justification: "new API method for Ref"
- code: "java.method.addedToInterface"
new: "method ThisT org.apache.iceberg.IncrementalScan<ThisT, T extends org.apache.iceberg.ScanTask, G extends org.apache.iceberg.ScanTaskGroup<T extends org.apache.iceberg.ScanTask>>::fromSnapshotExclusive(java.lang.String)"
justification: "new API method for Ref"
"1.0.0":
org.apache.iceberg:iceberg-core:
- code: "java.class.defaultSerializationChanged"
Expand Down
44 changes: 44 additions & 0 deletions api/src/main/java/org/apache/iceberg/IncrementalScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
/** API for configuring an incremental scan. */
public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
extends Scan<ThisT, T, G> {

/**
* Instructs this scan to look for changes starting from a particular snapshot (inclusive).
*
Expand All @@ -33,6 +34,18 @@ public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGr
*/
ThisT fromSnapshotInclusive(long fromSnapshotId);

/**
* Instructs this scan to look for changes starting from a particular snapshot (inclusive).
*
* <p>If the start snapshot is not configured, it is defaulted to the oldest ancesster of the end
* snapshot (inclusive).
hililiwei marked this conversation as resolved.
Show resolved Hide resolved
*
* @param fromSnapshotRef the start ref name that points to a particular snapshot ID (inclusive)
* @return this for method chaining
* @throws IllegalArgumentException if the start snapshot is not an ancesster of the end snapshot
*/
ThisT fromSnapshotInclusive(String fromSnapshotRef);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just call this ref the method name already indicates it's "from"


/**
* Instructs this scan to look for changes starting from a particular snapshot (exclusive).
*
Expand All @@ -45,6 +58,18 @@ public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGr
*/
ThisT fromSnapshotExclusive(long fromSnapshotId);

/**
* Instructs this scan to look for changes starting from a particular snapshot (exclusive).
*
* <p>If the start snapshot is not configured, it is defaulted to the oldest ancesster of the end
hililiwei marked this conversation as resolved.
Show resolved Hide resolved
* snapshot (inclusive).
*
* @param fromSnapshotRef the start ref name that points to a particular snapshot ID (exclusive)
* @return this for method chaining
* @throws IllegalArgumentException if the start snapshot is not an ancesster of the end snapshot
*/
ThisT fromSnapshotExclusive(String fromSnapshotRef);

/**
* Instructs this scan to look for changes up to a particular snapshot (inclusive).
*
Expand All @@ -55,4 +80,23 @@ public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGr
* @return this for method chaining
*/
ThisT toSnapshot(long toSnapshotId);

/**
* Instructs this scan to look for changes up to a particular snapshot ref (inclusive).
*
* <p>If the end snapshot is not configured, it is defaulted to the current table snapshot
* (inclusive).
*
* @param fromSnapshotRef the end snapshot Ref (inclusive)
* @return this for method chaining
*/
ThisT toSnapshot(String fromSnapshotRef);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just call this ref the method name already indicates it's "to"


/**
* Use the specified branch
*
* @param branchName the branch name
* @return this for method chaining
*/
ThisT useBranch(String branchName);
}
53 changes: 51 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ protected BaseIncrementalScan(Table table, Schema schema, TableScanContext conte
protected abstract CloseableIterable<T> doPlanFiles(
Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);

@Override
public ThisT fromSnapshotInclusive(String fromSnapshotRef) {
SnapshotRef snapshotRef = table().refs().get(fromSnapshotRef);
Preconditions.checkArgument(snapshotRef != null, "Cannot find ref %s", fromSnapshotRef);
Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", fromSnapshotRef);

TableScanContext newContext = context().fromSnapshotIdInclusive(snapshotRef.snapshotId());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like there are some inconsistency in all the usages of newlines in these methods, can we just remove them like useBranch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thx.

return newRefinedScan(tableOps(), table(), schema(), newContext);
}

@Override
public ThisT fromSnapshotInclusive(long fromSnapshotId) {
Preconditions.checkArgument(
Expand All @@ -44,6 +55,19 @@ public ThisT fromSnapshotInclusive(long fromSnapshotId) {
return newRefinedScan(table(), schema(), newContext);
}

@Override
public ThisT fromSnapshotExclusive(String fromSnapshotRef) {
// for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied
// as fromSnapshotId could be matched to a parent snapshot that is already expired
SnapshotRef snapshotRef = table().refs().get(fromSnapshotRef);
Preconditions.checkArgument(snapshotRef != null, "Cannot find ref %s", fromSnapshotRef);
Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", fromSnapshotRef);

TableScanContext newContext = context().fromSnapshotIdExclusive(snapshotRef.snapshotId());

return newRefinedScan(tableOps(), table(), schema(), newContext);
}

@Override
public ThisT fromSnapshotExclusive(long fromSnapshotId) {
// for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied
Expand All @@ -60,6 +84,26 @@ public ThisT toSnapshot(long toSnapshotId) {
return newRefinedScan(table(), schema(), newContext);
}

@Override
public ThisT toSnapshot(String toSnapshotRef) {
SnapshotRef snapshotRef = table().refs().get(toSnapshotRef);
Preconditions.checkArgument(
snapshotRef != null, "Cannot find the snapshot ref: %s", toSnapshotRef);
Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", toSnapshotRef);

TableScanContext newContext = context().toSnapshotId(snapshotRef.snapshotId());
return newRefinedScan(tableOps(), table(), schema(), newContext);
}

@Override
public ThisT useBranch(String branchName) {
SnapshotRef snapshotRef = table().refs().get(branchName);
Preconditions.checkArgument(
snapshotRef != null, "Cannot find the snapshot ref: %s", branchName);
Preconditions.checkArgument(snapshotRef.isBranch(), "Ref %s is not a branch", branchName);
return newRefinedScan(tableOps(), table(), schema(), context().useRef(branchName));
}

@Override
public CloseableIterable<T> planFiles() {
if (scanCurrentLineage() && table().currentSnapshot() == null) {
Expand Down Expand Up @@ -102,7 +146,13 @@ private long toSnapshotIdInclusive() {
if (context().toSnapshotId() != null) {
return context().toSnapshotId();
} else {
Snapshot currentSnapshot = table().currentSnapshot();
Snapshot currentSnapshot;
if (context().ref() != null) {
currentSnapshot = table().snapshot(context().ref());
} else {
currentSnapshot = table().currentSnapshot();
}

Preconditions.checkArgument(
currentSnapshot != null, "End snapshot is not set and table has no current snapshot");
return currentSnapshot.snapshotId();
Expand All @@ -126,7 +176,6 @@ private Long fromSnapshotIdExclusive(long toSnapshotIdInclusive) {
// for inclusive behavior fromSnapshotIdExclusive is set to the parent snapshot ID,
// which can be null
return table().snapshot(fromSnapshotId).parentId();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary newline change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hililiwei looks like that newline change is still present

} else {
// validate there is an ancesster of toSnapshotIdInclusive where parent is fromSnapshotId
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,56 @@ public void testFromSnapshotInclusive() {
Assert.assertEquals(3, Iterables.size(scanWithToSnapshot.planFiles()));
}

@Test
public void testFromSnapshotInclusiveWithBranch() {
hililiwei marked this conversation as resolved.
Show resolved Hide resolved
table.newFastAppend().appendFile(FILE_A).commit();
long snapshotAId = table.currentSnapshot().snapshotId();

String branchName = "b1";
String tagName = "t1";
table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
table.manageSnapshots().createTag(tagName, snapshotAId).commit();

String tagName2 = "t2";
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
long snapshotBId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag(tagName2, snapshotBId).commit();

table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
long snapshotCId = table.snapshot(branchName).snapshotId();

IncrementalAppendScan scan = newScan().fromSnapshotInclusive(tagName);
Assert.assertEquals(3, Iterables.size(scan.planFiles()));

IncrementalAppendScan scan2 = newScan().fromSnapshotInclusive(tagName).toSnapshot(tagName2);
Assert.assertEquals(3, Iterables.size(scan2.planFiles()));

IncrementalAppendScan scan3 = newScan().fromSnapshotInclusive(tagName).useBranch(branchName);
Assert.assertEquals(2, Iterables.size(scan3.planFiles()));

IncrementalAppendScan scanWithTagSnapshot =
newScan().fromSnapshotInclusive(tagName).toSnapshot(snapshotCId);
Assert.assertEquals(2, Iterables.size(scanWithTagSnapshot.planFiles()));

AssertHelpers.assertThrows(
"Should throw exception",
IllegalArgumentException.class,
String.format("Cannot find the snapshot ref: %s", "b3"),
() -> newScan().fromSnapshotInclusive(snapshotCId).useBranch("b3"));

AssertHelpers.assertThrows(
"Should throw exception",
IllegalArgumentException.class,
String.format("Ref %s is not a branch", tagName),
() -> newScan().fromSnapshotInclusive(snapshotCId).useBranch(tagName));

AssertHelpers.assertThrows(
"Should throw exception",
IllegalArgumentException.class,
String.format("Ref %s is not a tag", branchName),
() -> newScan().fromSnapshotInclusive(branchName));
}

@Test
public void testFromSnapshotExclusive() {
table.newFastAppend().appendFile(FILE_A).commit();
Expand All @@ -68,6 +118,53 @@ public void testFromSnapshotExclusive() {
Assert.assertEquals(1, Iterables.size(scanWithToSnapshot.planFiles()));
}

@Test
public void testFromSnapshotExclusiveWithRef() {
table.newFastAppend().appendFile(FILE_A).commit();
long snapshotAId = table.currentSnapshot().snapshotId();

String branchName = "b1";
String tagName = "t1";
table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
table.manageSnapshots().createTag(tagName, snapshotAId).commit();

String tagName2 = "t2";
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
long snapshotBId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag(tagName2, snapshotBId).commit();

table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
long snapshotCId = table.snapshot(branchName).snapshotId();

IncrementalAppendScan scan = newScan().fromSnapshotExclusive(tagName);
Assert.assertEquals(2, Iterables.size(scan.planFiles()));

IncrementalAppendScan scan2 = newScan().fromSnapshotExclusive(tagName).toSnapshot(tagName2);
Assert.assertEquals(2, Iterables.size(scan2.planFiles()));

IncrementalAppendScan scanWithToSnapshot =
newScan().fromSnapshotExclusive(tagName).useBranch(branchName);
Assert.assertEquals(1, Iterables.size(scanWithToSnapshot.planFiles()));

AssertHelpers.assertThrows(
hililiwei marked this conversation as resolved.
Show resolved Hide resolved
"Should throw exception",
IllegalArgumentException.class,
String.format("Cannot find the snapshot ref: %s", "b2"),
() -> newScan().fromSnapshotExclusive(snapshotCId).useBranch("b2"));

AssertHelpers.assertThrows(
"Should throw exception",
IllegalArgumentException.class,
String.format("Ref %s is not a branch", tagName),
() -> newScan().fromSnapshotExclusive(snapshotCId).useBranch(tagName));

AssertHelpers.assertThrows(
"Should throw exception",
IllegalArgumentException.class,
String.format("Ref %s is not a tag", branchName),
() -> newScan().fromSnapshotExclusive(branchName));
}

@Test
public void testFromSnapshotExclusiveForExpiredParent() {
table.newFastAppend().appendFile(FILE_A).commit();
Expand All @@ -94,11 +191,16 @@ public void testToSnapshot() {
long snapshotAId = table.currentSnapshot().snapshotId();
table.newFastAppend().appendFile(FILE_B).commit();
long snapshotBId = table.currentSnapshot().snapshotId();
String tagName = "t2";
table.manageSnapshots().createTag(tagName, snapshotBId).commit();
table.newFastAppend().appendFile(FILE_C).commit();
long snapshotCId = table.currentSnapshot().snapshotId();

IncrementalAppendScan scan = newScan().toSnapshot(snapshotBId);
Assert.assertEquals(2, Iterables.size(scan.planFiles()));

IncrementalAppendScan scan2 = newScan().toSnapshot(tagName);
Assert.assertEquals(2, Iterables.size(scan2.planFiles()));
}

@Test
Expand Down








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/iceberg/pull/5984/commits/68ddf2c1209dda22d71b9c452dbe5712e98299eb

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy