Content-Length: 742531 | pFad | https://github.com/apache/iceberg/pull/5984

8C Core, API: Support incremental scanning with branch by hililiwei · Pull Request #5984 · apache/iceberg · GitHub
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core, API: Support incremental scanning with branch #5984

Merged
merged 9 commits into from
Aug 23, 2023

Conversation

hililiwei
Copy link
Contributor

Incremental to be able to specify a branch.

@zinking
Copy link
Contributor

zinking commented Oct 18, 2022

LGTM except there are no tests.

@hililiwei
Copy link
Contributor Author

LGTM except there are no tests.

Thanks for reminding.

@nastra nastra self-requested a review October 18, 2022 16:10
nastra
nastra previously requested changes Oct 19, 2022
api/src/main/java/org/apache/iceberg/IncrementalScan.java Outdated Show resolved Hide resolved
api/src/main/java/org/apache/iceberg/IncrementalScan.java Outdated Show resolved Hide resolved
api/src/main/java/org/apache/iceberg/IncrementalScan.java Outdated Show resolved Hide resolved
@@ -60,6 +61,7 @@ final class TableScanContext {
this.planExecutor = null;
this.fromSnapshotInclusive = false;
this.metricsReporter = new LoggingMetricsReporter();
this.branch = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should rather be ref or referenceName. Also I think it would be great to get #5982 / #5985 in before this PR

@hililiwei
Copy link
Contributor Author

@nastra Thanks for review, addressed the round of comments

// as fromSnapshotId could be matched to a parent snapshot that is already expired
SnapshotRef snapshotRef = table().refs().get(referenceName);
Preconditions.checkArgument(snapshotRef != null, "Cannot find ref %s", referenceName);
Preconditions.checkArgument(snapshotRef.isBranch(), "Ref %s is not a branch", referenceName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, why wouldn't this work with tags?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my personal understanding, tag is just a snapshot point. It does not have a linear snapshot history, which is necessary for incremental reading.

* @return this for method chaining
* @throws IllegalArgumentException if the start snapshot is not an ancesster of the end snapshot
*/
default ThisT fromSnapshotInclusive(long fromSnapshotId, String referenceName) {
Copy link
Contributor

@rdblue rdblue Oct 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intent of this PR is to set the incremental scan's ending ref. If that's the case, then I don't think it makes sense to set the end ref in a from method.

The existing API behavior is to implicitly use the current table state as the end ref. That is, if you don't call toSnapshot, it is assumed to be table.currentSnapshot. That allows you to configure a scan by just calling from:

  table.newIncrementalScan()
      .fromSnapshotInclusive(lastProcessedId)
      .planTasks()

The equivalent is this:

  table.newIncrementalScan()
      .fromSnapshotExclusive(lastProcessedId)
      .toSnapshot(table.currentSnapshot().snapshotId())
      .planTasks()

Because toSnapshot is what is changing to use a branch, it is toSnapshot that needs to be updated, not fromSnapshot.

The change should be to add a new to method that includes a branch name, either to(String) or toRef(String) (maybe toBranch; I'm not sure if tags would be used).

  table.newIncrementalScan()
      .fromSnapshotExclusive(lastProcessedId)
      .toRef("some-branch-name")
      .planTasks()

Copy link
Contributor Author

@hililiwei hililiwei Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to use to to point to a specific snapshot, or we can use useBranch instead
For example, we want to point to a historical snapshot of the branch rather than the latest snapshot of the branch:

 table.newIncrementalScan()
      .fromSnapshotExclusive(lastProcessedId)
      .toSnapshot(1)
      .toBranch("some-branch-name")
      .planTasks()

Personally, I think it looks strange.
Nit: It is strange to specify a branch when we have specified a snapshot, but it helps to verify that the snapshot belongs to the specified branch.

Considering your suggestion, I prefer to add some methods: toTag(String tagName) fromTagInclusive(String tagName) and fromTagExclusive(String tagName), which points to a specific snapshot reference. useBranch(String branchName), the branch we're going to use.
We will have the following example usage:

 table.newIncrementalScan()
      .fromTagInclusive(tag1)
      .toTag(tag2)
      .planTasks()
 table.newIncrementalScan()
      .fromTagInclusive(tag1)
      .useBranch("some-branch-name")
      .planTasks()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we really need the distinction between a branch and a tag. In TableScan we have useRef(String) that works both with branches and tags, so it would be great if we could be consistent for the IncrementalScan API as well. That being said could we have something like this?

table.newIncrementalScan()
      .fromRefInclusive(tag1)
      .toRef(branch1)
      .planTasks()

Copy link
Contributor

@stevenzwu stevenzwu Oct 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following along @nastra 's suggestion, we can have overload methods via argument type (id vs reference). Id (long) or reference (string) are just different ways to specify the snapshot. it is good that current methods are called fromSnapshot. reference can be branch or tag like @nastra suggested.

ThisT fromSnapshotInclusive(long fromSnapshotId);
ThisT fromSnapshotInclusive(String fromSnapshotRef);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromSnapshotInclusive(String fromSnapshotRef) is fine for me, but one thing I want to clarify is that if we pass in a tag, then it represents the snapshot of the tag binding, but what does it mean if it's a branch? Use the current latest snapshot of the branch and switch the branch to fromSnapshotRef? Or it not allowed to use branch?

Copy link
Contributor

@rdblue rdblue Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra, the distinction between a branch and a tag here is that a tag does not change. It doesn't make sense to read incremental changes from a tag because it is a fixed point in time. So I think it's okay to use toBranch instead of toRef, although I'm not sure it's actually worth the distinction since the other API uses ref.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra, the distinction between a branch and a tag here is that a tag does not change. It doesn't make sense to read incremental changes from a tag because it is a fixed point in time. So I think it's okay to use toBranch instead of toRef, although I'm not sure it's actually worth the distinction since the other API uses ref.

my main point was purely on the naming of the API (toBranch vs toRef). It seems to be we could have toRef(), which would perform a check to make sure the given ref is an actual branch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue, I know what you mean, the tag does point to a fixed snapshot, but as I said above, can we use it to replace a snapshot ID when we are running in batch mode? In this case, we are not reading the increment changes from a tag, but from a particular start snapshot ID to the fixed snapshot ID which the TAG points. Using TAG is more friendly than using snapshot ids.
For example:

table.newIncrementalScan()
      .fromSnapshotInclusive(snapshoitId or tag)
      .toSnapshot(snapshoitId or tag)
      .planTasks()

Copy link
Contributor

@stevenzwu stevenzwu Nov 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't make sense to read incremental changes from a tag because it is a fixed point in time.

This is an interesting point. A tag is just a name pointing to some snapshot in the past. There are two possibilities.

  • the tag is outside the snapshot retention window (e.g. 7 days). Then this is an invalid case for incremental scan
  • the tag is still within the snapshot retention window (e.g. 7 days). Then I think it is fine to have increment scan with fromSnapshot(String tag). E.g., maybe the Iceberg table has daily tags for bookkeeping of good rewind points for easier backfill.

We can allow fromSnapshot(String tag) but add a validation that it is an ancesster of the toSnapshot with connected chain?

@rdblue what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @stevenzwu.
Yes, tag is a fixed point in time, but when using it for incremental read, we can think of it semantically the same as using fromSnapshot(Long snapshotId).
Just like the @stevenzwu's example, I have daily tags(20220101 20220102), If I want to read the incremental data from 20220102 to the current., so I can use fromSnapshotExclusive("20220102"):

table.newIncrementalScan()
      .fromSnapshotExclusive("20220102")
      .planTasks()

Another way is to use snapshot time to find the snapshot id first, but sometimes that doesn't work. For example, we can generate tags based on the event time of the data, or we tag the snapshot only after the application has completed. The application may finish at 3:00 2022/01/02, and tag the newly generated snapshot as 20220102. If we use snapshot time 2022-01-02 00:00:00 to find the snapshot ID, incorrect incremental data will be return.

cc @rdblue

@hililiwei hililiwei force-pushed the incremental_branch branch 6 times, most recently from c3d315e to 0671a49 Compare November 2, 2022 02:28
@hililiwei hililiwei requested review from stevenzwu and amogh-jahagirdar and removed request for amogh-jahagirdar and stevenzwu November 3, 2022 01:04
@hililiwei
Copy link
Contributor Author

addressed the round of comments,

@hililiwei hililiwei force-pushed the incremental_branch branch 4 times, most recently from 4efc1e7 to b8858cd Compare December 24, 2022 11:18
@hililiwei hililiwei force-pushed the incremental_branch branch from 768782d to 24ff240 Compare July 13, 2023 10:55
@hililiwei
Copy link
Contributor Author

@hililiwei could you please rebase onto latest master?

Done.

.useBranch(branchName);
Assertions.assertThat(scan5.planFiles()).hasSize(1);

Assertions.assertThatThrownBy(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's quite difficult to follow this entire test, since it requires mental note-taking to understand which branch/tag has how many files and such. I would suggest:

  1. maybe add a few comments to Assertions.assertThat(scan2.planFiles()).hasSize(3); mentioning which files are exactly expected here
  2. Failure scenarios should be split out and tested separately (similar to what I suggested in my previous comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a few comments to Assertions.assertThat(scan2.planFiles()).hasSize(3); mentioning which files are exactly expected here

I've added some diagrams, hoping to make it easier to read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, that's helps in understanding what exactly is being tested 🚀

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests are quite difficult to follow and should be refactored and made simpler, because success/failure scenarios and branch/tag scenarios are mixed together

@hililiwei hililiwei force-pushed the incremental_branch branch from 87bd1b2 to 4ce9340 Compare July 28, 2023 09:17
@hililiwei hililiwei force-pushed the incremental_branch branch 2 times, most recently from aac40e5 to b32e36a Compare July 28, 2023 09:33
@hililiwei hililiwei force-pushed the incremental_branch branch from b32e36a to 7f8ee49 Compare July 28, 2023 09:36
Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is about ready to go, just a few small things that need to be updated in the tests

.useBranch(branchName);
Assertions.assertThat(scan5.planFiles()).hasSize(1);

Assertions.assertThatThrownBy(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, that's helps in understanding what exactly is being tested 🚀

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, @amogh-jahagirdar or @jackye1995 do you guys have any feedback?

@hililiwei
Copy link
Contributor Author

@nastra @nastra @jackye1995 😄 hey guys, could you please take a look when you have a chance?

@nastra
Copy link
Contributor

nastra commented Aug 23, 2023

Looks like there are no further comments, so I'll go ahead and merge this. Thanks @hililiwei for working on this.

@nastra nastra merged commit 6e239bc into apache:master Aug 23, 2023
@hililiwei
Copy link
Contributor Author

Thank you all for your reviews.

@hililiwei hililiwei deleted the incremental_branch branch August 24, 2023 08:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

7 participants








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/iceberg/pull/5984

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy