-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Prevent setting endTag/endSnapshotId for streaming source #10207
Conversation
snapshotId == null, "Cannot set snapshot-id option for streaming reader"); | ||
Preconditions.checkArgument( | ||
asOfTimestamp == null, "Cannot set as-of-timestamp option for streaming reader"); | ||
Preconditions.checkArgument( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where were end snapshot (id or tag) silently overridden? I thought it might be a valid scenario. E.g., during backfill, maybe it can be streaming mode and discover splits snapshot by snapshot with an end snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 131 in fbcd142
ScanContext incrementalScan = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the code:
Snapshot toSnapshotInclusive =
toSnapshotInclusive(
lastConsumedSnapshotId, currentSnapshot, scanContext.maxPlanningSnapshotCount());
IcebergEnumeratorPosition newPosition =
IcebergEnumeratorPosition.of(
toSnapshotInclusive.snapshotId(), toSnapshotInclusive.timestampMillis());
ScanContext incrementalScan =
scanContext.copyWithAppendsBetween(
lastPosition.snapshotId(), toSnapshotInclusive.snapshotId());
The toSnapshotInclusive
reads until the current snapshot. Its only role is to prevent reading more snapshot than maxPlanningSnapshotCount
. And we set this as a last snapshot for the scan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is to set the end snapshot per incremental scan/discovery. The source doesn't check/support end snapshot (like Kafka source's bounded end position).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I got it. This is why we need the new validations to prevent the wrong parametrization of the source.
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestScanContext.java
Outdated
Show resolved
Hide resolved
76cdc62
to
c2815f6
Compare
…d add timeout to the previously failing test
@@ -130,7 +131,9 @@ private ScanContext( | |||
this.watermarkColumn = watermarkColumn; | |||
this.watermarkColumnTimeUnit = watermarkColumnTimeUnit; | |||
|
|||
validate(); | |||
if (!skipValidate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this only for the endSnapshotId? if yes, should we remove the validation on endSnapshotId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TestStreamScanSql
was stuck because we set snapshotId
too.
So this is for snapshotId
and for endSnapshotId
.
On the philosophical level, the question is:
- Do we want to make sure that the created
ScanContext
is always valid?
If yes, then we change the copyWithAppendsBetween
and the copyWithSnapshotId
to remove the streaming
flag, as those are not a streaming scans anymore.
If no, then we accept that the programmatically creates ScanContext
objects don't need validation.
I opted for the first solution as I'm not sure where we depend on the streaming settings, and it was the least disruptive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If yes, then we change the copyWithAppendsBetween and the copyWithSnapshotId to remove the streaming flag, as those are not a streaming scans anymore.
that is also not correct, because copy
should meant copy
. The main problem is that ScanContext
is used for both user intention (via source builder) and internal incremental scan. I agree that internal incremental scan shouldn't have the streaming setting.
note that ScanContext
is not a public class. Users can't construct this object directly. Maybe the validate()
method shouldn't be called by the constructor and only be called by the ScanContext#Builder#build()
method? or move some more intrinsic validation to IcebergSource
builder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went with your suggestion, and removed the generic validation, and called the validation explicitly from the source.
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
Show resolved
Hide resolved
cef9372
to
789191a
Compare
Merged to main. |
Co-authored-by: Peter Vary <peter_vary4@apple.com>
Co-authored-by: Peter Vary <peter_vary4@apple.com>
Co-authored-by: Peter Vary <peter_vary4@apple.com>
Tried to hack around issues with setting
endSnapshotId
for a streaming job. It turns out the code is silently overwriting the values set in the context.Since it doesn't make sense to set
end
for the streaming case, I propose to add validation to prevent these cases.The
Preconditions
are copied fromStreamingMonitorFunction
used by the old FlinkSource implementation.