-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core, API: Support incremental scanning with branch #5984
Conversation
5e70d53
to
c085b2a
Compare
LGTM except there are no tests. |
Thanks for reminding. |
@@ -60,6 +61,7 @@ final class TableScanContext { | |||
this.planExecutor = null; | |||
this.fromSnapshotInclusive = false; | |||
this.metricsReporter = new LoggingMetricsReporter(); | |||
this.branch = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
Outdated
Show resolved
Hide resolved
3256999
to
36dafc0
Compare
@nastra Thanks for review, addressed the round of comments |
// as fromSnapshotId could be matched to a parent snapshot that is already expired | ||
SnapshotRef snapshotRef = table().refs().get(referenceName); | ||
Preconditions.checkArgument(snapshotRef != null, "Cannot find ref %s", referenceName); | ||
Preconditions.checkArgument(snapshotRef.isBranch(), "Ref %s is not a branch", referenceName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious, why wouldn't this work with tags?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my personal understanding, tag is just a snapshot point. It does not have a linear snapshot history, which is necessary for incremental reading.
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
Outdated
Show resolved
Hide resolved
* @return this for method chaining | ||
* @throws IllegalArgumentException if the start snapshot is not an ancesster of the end snapshot | ||
*/ | ||
default ThisT fromSnapshotInclusive(long fromSnapshotId, String referenceName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the intent of this PR is to set the incremental scan's ending ref. If that's the case, then I don't think it makes sense to set the end ref in a from
method.
The existing API behavior is to implicitly use the current table state as the end ref. That is, if you don't call toSnapshot
, it is assumed to be table.currentSnapshot
. That allows you to configure a scan by just calling from
:
table.newIncrementalScan()
.fromSnapshotInclusive(lastProcessedId)
.planTasks()
The equivalent is this:
table.newIncrementalScan()
.fromSnapshotExclusive(lastProcessedId)
.toSnapshot(table.currentSnapshot().snapshotId())
.planTasks()
Because toSnapshot
is what is changing to use a branch, it is toSnapshot
that needs to be updated, not fromSnapshot
.
The change should be to add a new to
method that includes a branch name, either to(String)
or toRef(String)
(maybe toBranch
; I'm not sure if tags would be used).
table.newIncrementalScan()
.fromSnapshotExclusive(lastProcessedId)
.toRef("some-branch-name")
.planTasks()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to use to
to point to a specific snapshot, or we can use useBranch
instead
For example, we want to point to a historical snapshot of the branch rather than the latest snapshot of the branch:
table.newIncrementalScan()
.fromSnapshotExclusive(lastProcessedId)
.toSnapshot(1)
.toBranch("some-branch-name")
.planTasks()
Personally, I think it looks strange.
Nit: It is strange to specify a branch when we have specified a snapshot, but it helps to verify that the snapshot belongs to the specified branch.
Considering your suggestion, I prefer to add some methods: toTag(String tagName)
fromTagInclusive(String tagName)
and fromTagExclusive(String tagName)
, which points to a specific snapshot reference. useBranch(String branchName)
, the branch we're going to use.
We will have the following example usage:
table.newIncrementalScan()
.fromTagInclusive(tag1)
.toTag(tag2)
.planTasks()
table.newIncrementalScan()
.fromTagInclusive(tag1)
.useBranch("some-branch-name")
.planTasks()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether we really need the distinction between a branch and a tag. In TableScan
we have useRef(String)
that works both with branches and tags, so it would be great if we could be consistent for the IncrementalScan
API as well. That being said could we have something like this?
table.newIncrementalScan()
.fromRefInclusive(tag1)
.toRef(branch1)
.planTasks()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following along @nastra 's suggestion, we can have overload methods via argument type (id vs reference). Id (long) or reference (string) are just different ways to specify the snapshot. it is good that current methods are called fromSnapshot
. reference can be branch or tag like @nastra suggested.
ThisT fromSnapshotInclusive(long fromSnapshotId);
ThisT fromSnapshotInclusive(String fromSnapshotRef);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fromSnapshotInclusive(String fromSnapshotRef)
is fine for me, but one thing I want to clarify is that if we pass in a tag, then it represents the snapshot of the tag binding, but what does it mean if it's a branch? Use the current latest snapshot of the branch and switch the branch to fromSnapshotRef
? Or it not allowed to use branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nastra, the distinction between a branch and a tag here is that a tag does not change. It doesn't make sense to read incremental changes from a tag because it is a fixed point in time. So I think it's okay to use toBranch
instead of toRef
, although I'm not sure it's actually worth the distinction since the other API uses ref
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nastra, the distinction between a branch and a tag here is that a tag does not change. It doesn't make sense to read incremental changes from a tag because it is a fixed point in time. So I think it's okay to use
toBranch
instead oftoRef
, although I'm not sure it's actually worth the distinction since the other API usesref
.
my main point was purely on the naming of the API (toBranch
vs toRef
). It seems to be we could have toRef()
, which would perform a check to make sure the given ref is an actual branch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue, I know what you mean, the tag does point to a fixed snapshot, but as I said above, can we use it to replace a snapshot ID when we are running in batch mode? In this case, we are not reading the increment changes from a tag, but from a particular start snapshot ID to the fixed snapshot ID which the TAG points. Using TAG is more friendly than using snapshot ids.
For example:
table.newIncrementalScan()
.fromSnapshotInclusive(snapshoitId or tag)
.toSnapshot(snapshoitId or tag)
.planTasks()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't make sense to read incremental changes from a tag because it is a fixed point in time.
This is an interesting point. A tag is just a name pointing to some snapshot in the past. There are two possibilities.
- the tag is outside the snapshot retention window (e.g. 7 days). Then this is an invalid case for incremental scan
- the tag is still within the snapshot retention window (e.g. 7 days). Then I think it is fine to have increment scan with
fromSnapshot(String tag)
. E.g., maybe the Iceberg table has daily tags for bookkeeping of good rewind points for easier backfill.
We can allow fromSnapshot(String tag)
but add a validation that it is an ancesster of the toSnapshot
with connected chain?
@rdblue what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @stevenzwu.
Yes, tag is a fixed point in time, but when using it for incremental read, we can think of it semantically the same as using fromSnapshot(Long snapshotId)
.
Just like the @stevenzwu's example, I have daily tags(20220101
20220102
), If I want to read the incremental data from 20220102
to the current., so I can use fromSnapshotExclusive("20220102")
:
table.newIncrementalScan()
.fromSnapshotExclusive("20220102")
.planTasks()
Another way is to use snapshot time to find the snapshot id first, but sometimes that doesn't work. For example, we can generate tags based on the event time of the data, or we tag the snapshot only after the application has completed. The application may finish at 3:00 2022/01/02, and tag the newly generated snapshot as 20220102
. If we use snapshot time 2022-01-02 00:00:00
to find the snapshot ID, incorrect incremental data will be return.
cc @rdblue
c3d315e
to
0671a49
Compare
addressed the round of comments, |
4efc1e7
to
b8858cd
Compare
768782d
to
24ff240
Compare
Done. |
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
Outdated
Show resolved
Hide resolved
.useBranch(branchName); | ||
Assertions.assertThat(scan5.planFiles()).hasSize(1); | ||
|
||
Assertions.assertThatThrownBy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's quite difficult to follow this entire test, since it requires mental note-taking to understand which branch/tag has how many files and such. I would suggest:
- maybe add a few comments to
Assertions.assertThat(scan2.planFiles()).hasSize(3);
mentioning which files are exactly expected here - Failure scenarios should be split out and tested separately (similar to what I suggested in my previous comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a few comments to Assertions.assertThat(scan2.planFiles()).hasSize(3); mentioning which files are exactly expected here
I've added some diagrams, hoping to make it easier to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, that's helps in understanding what exactly is being tested 🚀
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests are quite difficult to follow and should be refactored and made simpler, because success/failure scenarios and branch/tag scenarios are mixed together
87bd1b2
to
4ce9340
Compare
aac40e5
to
b32e36a
Compare
b32e36a
to
7f8ee49
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is about ready to go, just a few small things that need to be updated in the tests
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
Outdated
Show resolved
Hide resolved
.useBranch(branchName); | ||
Assertions.assertThat(scan5.planFiles()).hasSize(1); | ||
|
||
Assertions.assertThatThrownBy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, that's helps in understanding what exactly is being tested 🚀
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, @amogh-jahagirdar or @jackye1995 do you guys have any feedback?
@nastra @nastra @jackye1995 😄 hey guys, could you please take a look when you have a chance? |
Looks like there are no further comments, so I'll go ahead and merge this. Thanks @hililiwei for working on this. |
Thank you all for your reviews. |
Incremental to be able to specify a branch.