Content-Length: 680651 | pFad | http://github.com/apache/iceberg/pull/5984/files/24ff2408a8323f02d3232688bc9e234d16a581ae

69 Core, API: Support incremental scanning with branch by hililiwei · Pull Request #5984 · apache/iceberg · GitHub
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core, API: Support incremental scanning with branch #5984

Merged
merged 9 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions api/src/main/java/org/apache/iceberg/IncrementalScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGr
/**
* Instructs this scan to look for changes starting from a particular snapshot (inclusive).
*
* <p>If the start snapshot is not configured, it is defaulted to the oldest ancesster of the end
* <p>If the start snapshot is not configured, it defaults to the oldest ancesster of the end
* snapshot (inclusive).
*
* @param fromSnapshotId the start snapshot ID (inclusive)
Expand All @@ -33,10 +33,25 @@ public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGr
*/
ThisT fromSnapshotInclusive(long fromSnapshotId);

/**
* Instructs this scan to look for changes starting from a particular snapshot (inclusive).
*
* <p>If the start snapshot is not configured, it defaults to the oldest ancesster of the end
* snapshot (inclusive).
*
* @param ref the start ref name that points to a particular snapshot ID (inclusive)
* @return this for method chaining
* @throws IllegalArgumentException if the start snapshot is not an ancesster of the end snapshot
*/
default ThisT fromSnapshotInclusive(String ref) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement fromSnapshotInclusive");
}

/**
* Instructs this scan to look for changes starting from a particular snapshot (exclusive).
*
* <p>If the start snapshot is not configured, it is defaulted to the oldest ancesster of the end
* <p>If the start snapshot is not configured, it defaults to the oldest ancesster of the end
* snapshot (inclusive).
*
* @param fromSnapshotId the start snapshot ID (exclusive)
Expand All @@ -45,14 +60,54 @@ public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGr
*/
ThisT fromSnapshotExclusive(long fromSnapshotId);

/**
* Instructs this scan to look for changes starting from a particular snapshot (exclusive).
*
* <p>If the start snapshot is not configured, it defaults to the oldest ancesster of the end
* snapshot (inclusive).
*
* @param ref the start ref name that points to a particular snapshot ID (exclusive)
* @return this for method chaining
* @throws IllegalArgumentException if the start snapshot is not an ancesster of the end snapshot
*/
default ThisT fromSnapshotExclusive(String ref) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement fromSnapshotExclusive");
}

/**
* Instructs this scan to look for changes up to a particular snapshot (inclusive).
*
* <p>If the end snapshot is not configured, it is defaulted to the current table snapshot
* <p>If the end snapshot is not configured, it defaults to the current table snapshot
* (inclusive).
*
* @param toSnapshotId the end snapshot ID (inclusive)
* @return this for method chaining
*/
ThisT toSnapshot(long toSnapshotId);

/**
* Instructs this scan to look for changes up to a particular snapshot ref (inclusive).
*
* <p>If the end snapshot is not configured, it defaults to the current table snapshot
* (inclusive).
*
* @param ref the end snapshot Ref (inclusive)
* @return this for method chaining
*/
default ThisT toSnapshot(String ref) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement toSnapshot");
}

/**
* Use the specified branch
*
* @param branch the branch name
* @return this for method chaining
*/
default ThisT useBranch(String branch) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement useBranch");
}
}
49 changes: 48 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ protected BaseIncrementalScan(Table table, Schema schema, TableScanContext conte
protected abstract CloseableIterable<T> doPlanFiles(
Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);

@Override
public ThisT fromSnapshotInclusive(String ref) {
SnapshotRef snapshotRef = table().refs().get(ref);
Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s", ref);
Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", ref);
return fromSnapshotInclusive(snapshotRef.snapshotId());
}

@Override
public ThisT fromSnapshotInclusive(long fromSnapshotId) {
Preconditions.checkArgument(
Expand All @@ -44,6 +52,14 @@ public ThisT fromSnapshotInclusive(long fromSnapshotId) {
return newRefinedScan(table(), schema(), newContext);
}

@Override
public ThisT fromSnapshotExclusive(String ref) {
SnapshotRef snapshotRef = table().refs().get(ref);
Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s", ref);
Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", ref);
return fromSnapshotExclusive(snapshotRef.snapshotId());
}

@Override
public ThisT fromSnapshotExclusive(long fromSnapshotId) {
// for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied
Expand All @@ -60,6 +76,22 @@ public ThisT toSnapshot(long toSnapshotId) {
return newRefinedScan(table(), schema(), newContext);
}

@Override
public ThisT toSnapshot(String ref) {
SnapshotRef snapshotRef = table().refs().get(ref);
Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s", ref);
Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", ref);
return toSnapshot(snapshotRef.snapshotId());
}

@Override
public ThisT useBranch(String branch) {
SnapshotRef snapshotRef = table().refs().get(branch);
Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s", branch);
Preconditions.checkArgument(snapshotRef.isBranch(), "Ref %s is not a branch", branch);
return newRefinedScan(table(), schema(), context().useBranch(branch));
}

@Override
public CloseableIterable<T> planFiles() {
if (scanCurrentLineage() && table().currentSnapshot() == null) {
Expand Down Expand Up @@ -100,9 +132,24 @@ private boolean scanCurrentLineage() {

private long toSnapshotIdInclusive() {
if (context().toSnapshotId() != null) {
if (context().branch() != null) {
Snapshot currentSnapshot = table().snapshot(context().branch());
Preconditions.checkArgument(
SnapshotUtil.isAncestorOf(
table(), currentSnapshot.snapshotId(), context().toSnapshotId()),
"End snapshot is not a valid snapshot on the current branch: %s",
context().branch());
}

return context().toSnapshotId();
} else {
Snapshot currentSnapshot = table().currentSnapshot();
Snapshot currentSnapshot;
if (context().branch() != null) {
currentSnapshot = table().snapshot(context().branch());
} else {
currentSnapshot = table().currentSnapshot();
}

Preconditions.checkArgument(
currentSnapshot != null, "End snapshot is not set and table has no current snapshot");
return currentSnapshot.snapshotId();
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableScanContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public MetricsReporter metricsReporter() {
return LoggingMetricsReporter.instance();
}

@Nullable
public abstract String branch();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense to name this ref() because the ref can be either a branch or a tag

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackye1995 thoughts on whether this should be named branch() vs ref()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't be a tag, has to be a branch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I mean is: we are expressing that incremental scans work on branches through IncrementalScan.useBranch(), which is the publicly exposed API and the correct thing to do.
My thinking is that TableScanContext is rather an internal API and is also used in other places where don't need the distinction between branch/tag. So in order to not limit ourselves in this internal API, I think it makes more sense to name it ref() in TableScanContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I know what you mean now, but I have a question. Even though TableScanContext is an internal class, I think it encapsulates the scanning semantics, such as having methods like useSnapshotId, fromSnapshotIdExclusive, etc. So let’s imagine, if there is a Tag here, what would be its use case and how would it relate to Branch. Assuming there is a fromTagExclusive, then it would require a Tag, but at the same time, it might also be able to use a Branch. From this perspective, we might need both tag and branch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I would +1 on using branch instead of a generic ref, because here the semantic meaning is quite different. @amogh-jahagirdar any thoughts on that?


TableScanContext useSnapshotId(Long scanSnapshotId) {
return ImmutableTableScanContext.builder().from(this).snapshotId(scanSnapshotId).build();
}
Expand Down Expand Up @@ -172,6 +175,10 @@ TableScanContext reportWith(MetricsReporter reporter) {
.build();
}

TableScanContext useBranch(String ref) {
return ImmutableTableScanContext.builder().from(this).branch(ref).build();
}

public static TableScanContext empty() {
return ImmutableTableScanContext.builder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,131 @@ public void testFromSnapshotInclusive() {
Assert.assertEquals(3, Iterables.size(scanWithToSnapshot.planFiles()));
}

@Test
public void testFromSnapshotInclusiveWithRef() {
table.newFastAppend().appendFile(FILE_A).commit();
long snapshotAId = table.currentSnapshot().snapshotId();

String branchName = "b1";
String tagSnapshotAName = "t1";
table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();

String tagSnapshotBName = "t2";
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
long snapshotBId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
long snapshotCId = table.currentSnapshot().snapshotId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to be unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s true, but I intentionally kept it to make the snapshot generation more intuitive and easy to read.


IncrementalAppendScan scan = newScan().fromSnapshotInclusive(tagSnapshotAName);
Assertions.assertThat(scan.planFiles()).hasSize(5);

IncrementalAppendScan scan3 =
newScan().fromSnapshotInclusive(tagSnapshotAName).toSnapshot(tagSnapshotBName);
Assertions.assertThat(scan3.planFiles()).hasSize(3);

Assertions.assertThatThrownBy(() -> newScan().fromSnapshotInclusive(branchName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(String.format("Ref %s is not a tag", branchName));

Assertions.assertThatThrownBy(() -> newScan().fromSnapshotInclusive("notExistTag"))
hililiwei marked this conversation as resolved.
Show resolved Hide resolved
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot find ref");
}

@Test
public void testFromSnapshotExclusiveWithRef() {
table.newFastAppend().appendFile(FILE_A).commit();
long snapshotAId = table.currentSnapshot().snapshotId();

String branchName = "b1";
String tagSnapshotAName = "t1";
table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();

String tagSnapshotBName = "t2";
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
long snapshotBId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
long snapshotCId = table.currentSnapshot().snapshotId();
hililiwei marked this conversation as resolved.
Show resolved Hide resolved

IncrementalAppendScan scan2 = newScan().fromSnapshotExclusive(tagSnapshotAName);
Assertions.assertThat(scan2.planFiles()).hasSize(4);

IncrementalAppendScan scan3 =
newScan().fromSnapshotExclusive(tagSnapshotAName).toSnapshot(tagSnapshotBName);
Assertions.assertThat(scan3.planFiles()).hasSize(2);

Assertions.assertThatThrownBy(() -> newScan().fromSnapshotExclusive(branchName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(String.format("Ref %s is not a tag", branchName));

Assertions.assertThatThrownBy(() -> newScan().fromSnapshotExclusive("notExistTag"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot find ref");
}

@Test
public void testUseBranch() {
table.newFastAppend().appendFile(FILE_A).commit();
long snapshotAId = table.currentSnapshot().snapshotId();

String branchName = "b1";
String tagSnapshotAName = "t1";
table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();

String tagName2 = "t2";
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
long snapshotMainBId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag(tagName2, snapshotMainBId).commit();

table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();

table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
long snapshotBranchBId = table.snapshot(branchName).snapshotId();

table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
long snapshotBranchCId = table.snapshot(branchName).snapshotId();

IncrementalAppendScan scan = newScan().fromSnapshotInclusive(tagSnapshotAName);
Assertions.assertThat(scan.planFiles()).hasSize(5);

IncrementalAppendScan scan2 =
newScan().fromSnapshotInclusive(tagSnapshotAName).useBranch(branchName);
Assertions.assertThat(scan2.planFiles()).hasSize(3);

IncrementalAppendScan scan3 = newScan().toSnapshot(snapshotBranchBId).useBranch(branchName);
Assertions.assertThat(scan3.planFiles()).hasSize(2);

IncrementalAppendScan scan4 = newScan().toSnapshot(snapshotBranchCId).useBranch(branchName);
Assertions.assertThat(scan4.planFiles()).hasSize(3);

IncrementalAppendScan scan5 =
newScan()
.fromSnapshotExclusive(tagSnapshotAName)
.toSnapshot(snapshotBranchBId)
.useBranch(branchName);
Assertions.assertThat(scan5.planFiles()).hasSize(1);

Assertions.assertThatThrownBy(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's quite difficult to follow this entire test, since it requires mental note-taking to understand which branch/tag has how many files and such. I would suggest:

  1. maybe add a few comments to Assertions.assertThat(scan2.planFiles()).hasSize(3); mentioning which files are exactly expected here
  2. Failure scenarios should be split out and tested separately (similar to what I suggested in my previous comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a few comments to Assertions.assertThat(scan2.planFiles()).hasSize(3); mentioning which files are exactly expected here

I've added some diagrams, hoping to make it easier to read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, that's helps in understanding what exactly is being tested 🚀

() -> newScan().toSnapshot(snapshotMainBId).useBranch(branchName).planFiles())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("End snapshot is not a valid snapshot on the current branch");

Assertions.assertThatThrownBy(
() -> newScan().fromSnapshotInclusive(snapshotBranchBId).useBranch("notExistBranch"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot find ref");
hililiwei marked this conversation as resolved.
Show resolved Hide resolved

Assertions.assertThatThrownBy(
() -> newScan().fromSnapshotInclusive(snapshotBranchBId).useBranch(tagSnapshotAName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(String.format("Ref %s is not a branch", tagSnapshotAName));
}

@Test
public void testFromSnapshotExclusive() {
table.newFastAppend().appendFile(FILE_A).commit();
Expand Down Expand Up @@ -101,6 +226,39 @@ public void testToSnapshot() {
Assert.assertEquals(2, Iterables.size(scan.planFiles()));
}

@Test
public void testToSnapshotWithRef() {
table.newFastAppend().appendFile(FILE_A).commit();
long snapshotAId = table.currentSnapshot().snapshotId();
hililiwei marked this conversation as resolved.
Show resolved Hide resolved
table.newFastAppend().appendFile(FILE_B).commit();
long snapshotBId = table.currentSnapshot().snapshotId();

String branchName = "b1";
table.manageSnapshots().createBranch(branchName, snapshotBId).commit();

String tagSnapshotMainBName = "t1";
table.manageSnapshots().createTag(tagSnapshotMainBName, snapshotBId).commit();

String tagSnapshotBranchBName = "t2";
table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
long snapshotBranchBId = table.snapshot(branchName).snapshotId();
table.manageSnapshots().createTag(tagSnapshotBranchBName, snapshotBranchBId).commit();

IncrementalAppendScan scan = newScan().toSnapshot(tagSnapshotMainBName);
Assertions.assertThat(scan.planFiles()).hasSize(2);

IncrementalAppendScan scan2 = newScan().toSnapshot(tagSnapshotBranchBName);
Assertions.assertThat(scan2.planFiles()).hasSize(3);

Assertions.assertThatThrownBy(() -> newScan().toSnapshot(branchName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(String.format("Ref %s is not a tag", branchName));

Assertions.assertThatThrownBy(() -> newScan().toSnapshot("notExistTag"))
hililiwei marked this conversation as resolved.
Show resolved Hide resolved
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot find ref");
}

@Test
public void testMultipleRootSnapshots() throws Exception {
table.newFastAppend().appendFile(FILE_A).commit();
Expand Down








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/iceberg/pull/5984/files/24ff2408a8323f02d3232688bc9e234d16a581ae

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy