-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core, API: Support incremental scanning with branch #5984
Changes from 5 commits
68ddf2c
9f0a543
04c76c4
9be8122
24ff240
349087e
7f8ee49
1c42627
22c1ab5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,131 @@ public void testFromSnapshotInclusive() { | |
Assert.assertEquals(3, Iterables.size(scanWithToSnapshot.planFiles())); | ||
} | ||
|
||
@Test | ||
public void testFromSnapshotInclusiveWithRef() { | ||
table.newFastAppend().appendFile(FILE_A).commit(); | ||
long snapshotAId = table.currentSnapshot().snapshotId(); | ||
|
||
String branchName = "b1"; | ||
String tagSnapshotAName = "t1"; | ||
table.manageSnapshots().createBranch(branchName, snapshotAId).commit(); | ||
table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit(); | ||
|
||
String tagSnapshotBName = "t2"; | ||
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit(); | ||
long snapshotBId = table.currentSnapshot().snapshotId(); | ||
table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit(); | ||
table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit(); | ||
long snapshotCId = table.currentSnapshot().snapshotId(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems to be unused There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That’s true, but I intentionally kept it to make the snapshot generation more intuitive and easy to read. |
||
|
||
IncrementalAppendScan scan = newScan().fromSnapshotInclusive(tagSnapshotAName); | ||
Assertions.assertThat(scan.planFiles()).hasSize(5); | ||
|
||
IncrementalAppendScan scan3 = | ||
newScan().fromSnapshotInclusive(tagSnapshotAName).toSnapshot(tagSnapshotBName); | ||
Assertions.assertThat(scan3.planFiles()).hasSize(3); | ||
|
||
Assertions.assertThatThrownBy(() -> newScan().fromSnapshotInclusive(branchName)) | ||
.isInstanceOf(IllegalArgumentException.class) | ||
.hasMessage(String.format("Ref %s is not a tag", branchName)); | ||
|
||
Assertions.assertThatThrownBy(() -> newScan().fromSnapshotInclusive("notExistTag")) | ||
hililiwei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.isInstanceOf(IllegalArgumentException.class) | ||
.hasMessageContaining("Cannot find ref"); | ||
} | ||
|
||
@Test | ||
public void testFromSnapshotExclusiveWithRef() { | ||
table.newFastAppend().appendFile(FILE_A).commit(); | ||
long snapshotAId = table.currentSnapshot().snapshotId(); | ||
|
||
String branchName = "b1"; | ||
String tagSnapshotAName = "t1"; | ||
table.manageSnapshots().createBranch(branchName, snapshotAId).commit(); | ||
table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit(); | ||
|
||
String tagSnapshotBName = "t2"; | ||
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit(); | ||
long snapshotBId = table.currentSnapshot().snapshotId(); | ||
table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit(); | ||
table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit(); | ||
long snapshotCId = table.currentSnapshot().snapshotId(); | ||
hililiwei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
IncrementalAppendScan scan2 = newScan().fromSnapshotExclusive(tagSnapshotAName); | ||
Assertions.assertThat(scan2.planFiles()).hasSize(4); | ||
|
||
IncrementalAppendScan scan3 = | ||
newScan().fromSnapshotExclusive(tagSnapshotAName).toSnapshot(tagSnapshotBName); | ||
Assertions.assertThat(scan3.planFiles()).hasSize(2); | ||
|
||
Assertions.assertThatThrownBy(() -> newScan().fromSnapshotExclusive(branchName)) | ||
.isInstanceOf(IllegalArgumentException.class) | ||
.hasMessage(String.format("Ref %s is not a tag", branchName)); | ||
|
||
Assertions.assertThatThrownBy(() -> newScan().fromSnapshotExclusive("notExistTag")) | ||
.isInstanceOf(IllegalArgumentException.class) | ||
.hasMessageContaining("Cannot find ref"); | ||
} | ||
|
||
@Test | ||
public void testUseBranch() { | ||
table.newFastAppend().appendFile(FILE_A).commit(); | ||
long snapshotAId = table.currentSnapshot().snapshotId(); | ||
|
||
String branchName = "b1"; | ||
String tagSnapshotAName = "t1"; | ||
table.manageSnapshots().createBranch(branchName, snapshotAId).commit(); | ||
table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit(); | ||
|
||
String tagName2 = "t2"; | ||
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit(); | ||
long snapshotMainBId = table.currentSnapshot().snapshotId(); | ||
table.manageSnapshots().createTag(tagName2, snapshotMainBId).commit(); | ||
|
||
table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit(); | ||
|
||
table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit(); | ||
long snapshotBranchBId = table.snapshot(branchName).snapshotId(); | ||
|
||
table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit(); | ||
long snapshotBranchCId = table.snapshot(branchName).snapshotId(); | ||
|
||
IncrementalAppendScan scan = newScan().fromSnapshotInclusive(tagSnapshotAName); | ||
Assertions.assertThat(scan.planFiles()).hasSize(5); | ||
|
||
IncrementalAppendScan scan2 = | ||
newScan().fromSnapshotInclusive(tagSnapshotAName).useBranch(branchName); | ||
Assertions.assertThat(scan2.planFiles()).hasSize(3); | ||
|
||
IncrementalAppendScan scan3 = newScan().toSnapshot(snapshotBranchBId).useBranch(branchName); | ||
Assertions.assertThat(scan3.planFiles()).hasSize(2); | ||
|
||
IncrementalAppendScan scan4 = newScan().toSnapshot(snapshotBranchCId).useBranch(branchName); | ||
Assertions.assertThat(scan4.planFiles()).hasSize(3); | ||
|
||
IncrementalAppendScan scan5 = | ||
newScan() | ||
.fromSnapshotExclusive(tagSnapshotAName) | ||
.toSnapshot(snapshotBranchBId) | ||
.useBranch(branchName); | ||
Assertions.assertThat(scan5.planFiles()).hasSize(1); | ||
|
||
Assertions.assertThatThrownBy( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's quite difficult to follow this entire test, since it requires mental note-taking to understand which branch/tag has how many files and such. I would suggest:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I've added some diagrams, hoping to make it easier to read. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks, that's helps in understanding what exactly is being tested 🚀 |
||
() -> newScan().toSnapshot(snapshotMainBId).useBranch(branchName).planFiles()) | ||
.isInstanceOf(IllegalArgumentException.class) | ||
.hasMessageContaining("End snapshot is not a valid snapshot on the current branch"); | ||
|
||
Assertions.assertThatThrownBy( | ||
() -> newScan().fromSnapshotInclusive(snapshotBranchBId).useBranch("notExistBranch")) | ||
.isInstanceOf(IllegalArgumentException.class) | ||
.hasMessageContaining("Cannot find ref"); | ||
hililiwei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Assertions.assertThatThrownBy( | ||
() -> newScan().fromSnapshotInclusive(snapshotBranchBId).useBranch(tagSnapshotAName)) | ||
.isInstanceOf(IllegalArgumentException.class) | ||
.hasMessage(String.format("Ref %s is not a branch", tagSnapshotAName)); | ||
} | ||
|
||
@Test | ||
public void testFromSnapshotExclusive() { | ||
table.newFastAppend().appendFile(FILE_A).commit(); | ||
|
@@ -101,6 +226,39 @@ public void testToSnapshot() { | |
Assert.assertEquals(2, Iterables.size(scan.planFiles())); | ||
} | ||
|
||
@Test | ||
public void testToSnapshotWithRef() { | ||
table.newFastAppend().appendFile(FILE_A).commit(); | ||
long snapshotAId = table.currentSnapshot().snapshotId(); | ||
hililiwei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
table.newFastAppend().appendFile(FILE_B).commit(); | ||
long snapshotBId = table.currentSnapshot().snapshotId(); | ||
|
||
String branchName = "b1"; | ||
table.manageSnapshots().createBranch(branchName, snapshotBId).commit(); | ||
|
||
String tagSnapshotMainBName = "t1"; | ||
table.manageSnapshots().createTag(tagSnapshotMainBName, snapshotBId).commit(); | ||
|
||
String tagSnapshotBranchBName = "t2"; | ||
table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit(); | ||
long snapshotBranchBId = table.snapshot(branchName).snapshotId(); | ||
table.manageSnapshots().createTag(tagSnapshotBranchBName, snapshotBranchBId).commit(); | ||
|
||
IncrementalAppendScan scan = newScan().toSnapshot(tagSnapshotMainBName); | ||
Assertions.assertThat(scan.planFiles()).hasSize(2); | ||
|
||
IncrementalAppendScan scan2 = newScan().toSnapshot(tagSnapshotBranchBName); | ||
Assertions.assertThat(scan2.planFiles()).hasSize(3); | ||
|
||
Assertions.assertThatThrownBy(() -> newScan().toSnapshot(branchName)) | ||
.isInstanceOf(IllegalArgumentException.class) | ||
.hasMessage(String.format("Ref %s is not a tag", branchName)); | ||
|
||
Assertions.assertThatThrownBy(() -> newScan().toSnapshot("notExistTag")) | ||
hililiwei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.isInstanceOf(IllegalArgumentException.class) | ||
.hasMessageContaining("Cannot find ref"); | ||
} | ||
|
||
@Test | ||
public void testMultipleRootSnapshots() throws Exception { | ||
table.newFastAppend().appendFile(FILE_A).commit(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes more sense to name this
ref()
because the ref can be either a branch or a tagThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackye1995 thoughts on whether this should be named
branch()
vsref()
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can't be a tag, has to be a branch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what I mean is: we are expressing that incremental scans work on branches through
IncrementalScan.useBranch()
, which is the publicly exposed API and the correct thing to do.My thinking is that
TableScanContext
is rather an internal API and is also used in other places where don't need the distinction between branch/tag. So in order to not limit ourselves in this internal API, I think it makes more sense to name itref()
inTableScanContext
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I know what you mean now, but I have a question. Even though TableScanContext is an internal class, I think it encapsulates the scanning semantics, such as having methods like
useSnapshotId
,fromSnapshotIdExclusive
, etc. So let’s imagine, if there is aTag
here, what would be its use case and how would it relate to Branch. Assuming there is afromTagExclusive
, then it would require a Tag, but at the same time, it might also be able to use a Branch. From this perspective, we might need bothtag
andbranch
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I would +1 on using
branch
instead of a genericref
, because here the semantic meaning is quite different. @amogh-jahagirdar any thoughts on that?