Content-Length: 417660 | pFad | https://github.com/apache/iceberg/pull/10207

A9 Flink: Prevent setting endTag/endSnapshotId for streaming source by pvary · Pull Request #10207 · apache/iceberg · GitHub
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Prevent setting endTag/endSnapshotId for streaming source #10207

Merged
merged 5 commits into from
Apr 26, 2024

Conversation

pvary
Copy link
Contributor

@pvary pvary commented Apr 23, 2024

Tried to hack around issues with setting endSnapshotId for a streaming job. It turns out the code is silently overwriting the values set in the context.

Since it doesn't make sense to set end for the streaming case, I propose to add validation to prevent these cases.
The Preconditions are copied from StreamingMonitorFunction used by the old FlinkSource implementation.

@github-actions github-actions bot added the flink label Apr 23, 2024
snapshotId == null, "Cannot set snapshot-id option for streaming reader");
Preconditions.checkArgument(
asOfTimestamp == null, "Cannot set as-of-timestamp option for streaming reader");
Preconditions.checkArgument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where were end snapshot (id or tag) silently overridden? I thought it might be a valid scenario. E.g., during backfill, maybe it can be streaming mode and discover splits snapshot by snapshot with an end snapshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the code:

      Snapshot toSnapshotInclusive =
          toSnapshotInclusive(
              lastConsumedSnapshotId, currentSnapshot, scanContext.maxPlanningSnapshotCount());
      IcebergEnumeratorPosition newPosition =
          IcebergEnumeratorPosition.of(
              toSnapshotInclusive.snapshotId(), toSnapshotInclusive.timestampMillis());
      ScanContext incrementalScan =
          scanContext.copyWithAppendsBetween(
              lastPosition.snapshotId(), toSnapshotInclusive.snapshotId());

The toSnapshotInclusive reads until the current snapshot. Its only role is to prevent reading more snapshot than maxPlanningSnapshotCount. And we set this as a last snapshot for the scan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to set the end snapshot per incremental scan/discovery. The source doesn't check/support end snapshot (like Kafka source's bounded end position).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I got it. This is why we need the new validations to prevent the wrong parametrization of the source.

@pvary pvary force-pushed the prevent_end_on_stream branch from 76cdc62 to c2815f6 Compare April 24, 2024 11:14
…d add timeout to the previously failing test
@@ -130,7 +131,9 @@ private ScanContext(
this.watermarkColumn = watermarkColumn;
this.watermarkColumnTimeUnit = watermarkColumnTimeUnit;

validate();
if (!skipValidate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this only for the endSnapshotId? if yes, should we remove the validation on endSnapshotId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TestStreamScanSql was stuck because we set snapshotId too.
So this is for snapshotId and for endSnapshotId.

On the philosophical level, the question is:

  • Do we want to make sure that the created ScanContext is always valid?

If yes, then we change the copyWithAppendsBetween and the copyWithSnapshotId to remove the streaming flag, as those are not a streaming scans anymore.

If no, then we accept that the programmatically creates ScanContext objects don't need validation.

I opted for the first solution as I'm not sure where we depend on the streaming settings, and it was the least disruptive.

Copy link
Contributor

@stevenzwu stevenzwu Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If yes, then we change the copyWithAppendsBetween and the copyWithSnapshotId to remove the streaming flag, as those are not a streaming scans anymore.

that is also not correct, because copy should meant copy. The main problem is that ScanContext is used for both user intention (via source builder) and internal incremental scan. I agree that internal incremental scan shouldn't have the streaming setting.

note that ScanContext is not a public class. Users can't construct this object directly. Maybe the validate() method shouldn't be called by the constructor and only be called by the ScanContext#Builder#build() method? or move some more intrinsic validation to IcebergSource builder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went with your suggestion, and removed the generic validation, and called the validation explicitly from the source.

@pvary pvary force-pushed the prevent_end_on_stream branch from cef9372 to 789191a Compare April 26, 2024 11:16
@pvary pvary merged commit 646440a into apache:main Apr 26, 2024
13 checks passed
@pvary pvary deleted the prevent_end_on_stream branch April 26, 2024 17:19
@pvary
Copy link
Contributor Author

pvary commented Apr 26, 2024

Merged to main.
Thanks for the review @stevenzwu!

pvary pushed a commit to pvary/iceberg that referenced this pull request Apr 27, 2024
stevenzwu pushed a commit that referenced this pull request Apr 27, 2024
Co-authored-by: Peter Vary <peter_vary4@apple.com>
sasankpagolu pushed a commit to sasankpagolu/iceberg that referenced this pull request Oct 27, 2024
sasankpagolu pushed a commit to sasankpagolu/iceberg that referenced this pull request Oct 27, 2024
Co-authored-by: Peter Vary <peter_vary4@apple.com>
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
Co-authored-by: Peter Vary <peter_vary4@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/iceberg/pull/10207

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy