Content-Length: 394568 | pFad | http://github.com/apache/iceberg/pull/10207/commits/6d03fa601fc24e1f98a3a26d213e8492bd00e2b3

9F Flink: Prevent setting endTag/endSnapshotId for streaming source by pvary · Pull Request #10207 · apache/iceberg · GitHub
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Prevent setting endTag/endSnapshotId for streaming source #10207

Merged
merged 5 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Skip validation for ScanContexts created by the monitor functions, an…
…d add timeout to the previously failing test
  • Loading branch information
Peter Vary committed Apr 24, 2024
commit 6d03fa601fc24e1f98a3a26d213e8492bd00e2b3
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ private ScanContext(
String branch,
String tag,
String startTag,
String endTag) {
String endTag,
boolean skipValidate) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
this.tag = tag;
Expand Down Expand Up @@ -130,7 +131,9 @@ private ScanContext(
this.watermarkColumn = watermarkColumn;
this.watermarkColumnTimeUnit = watermarkColumnTimeUnit;

validate();
if (!skipValidate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this only for the endSnapshotId? if yes, should we remove the validation on endSnapshotId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TestStreamScanSql was stuck because we set snapshotId too.
So this is for snapshotId and for endSnapshotId.

On the philosophical level, the question is:

  • Do we want to make sure that the created ScanContext is always valid?

If yes, then we change the copyWithAppendsBetween and the copyWithSnapshotId to remove the streaming flag, as those are not a streaming scans anymore.

If no, then we accept that the programmatically creates ScanContext objects don't need validation.

I opted for the first solution as I'm not sure where we depend on the streaming settings, and it was the least disruptive.

Copy link
Contributor

@stevenzwu stevenzwu Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If yes, then we change the copyWithAppendsBetween and the copyWithSnapshotId to remove the streaming flag, as those are not a streaming scans anymore.

that is also not correct, because copy should meant copy. The main problem is that ScanContext is used for both user intention (via source builder) and internal incremental scan. I agree that internal incremental scan shouldn't have the streaming setting.

note that ScanContext is not a public class. Users can't construct this object directly. Maybe the validate() method shouldn't be called by the constructor and only be called by the ScanContext#Builder#build() method? or move some more intrinsic validation to IcebergSource builder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went with your suggestion, and removed the generic validation, and called the validation explicitly from the source.

validate();
}
}

private void validate() {
Expand Down Expand Up @@ -317,6 +320,7 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(watermarkColumn)
.watermarkColumnTimeUnit(watermarkColumnTimeUnit)
.skipValidate()
.build();
}

Expand Down Expand Up @@ -348,6 +352,7 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) {
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(watermarkColumn)
.watermarkColumnTimeUnit(watermarkColumnTimeUnit)
.skipValidate()
.build();
}

Expand Down Expand Up @@ -391,6 +396,7 @@ public static class Builder {
private String watermarkColumn = FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();
private TimeUnit watermarkColumnTimeUnit =
FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue();
private boolean skipValidate = false;

private Builder() {}

Expand Down Expand Up @@ -534,6 +540,11 @@ public Builder watermarkColumnTimeUnit(TimeUnit newWatermarkTimeUnit) {
return this;
}

public Builder skipValidate() {
this.skipValidate = true;
return this;
}

public Builder resolveConfig(
Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
Expand Down Expand Up @@ -593,7 +604,8 @@ public ScanContext build() {
branch,
tag,
startTag,
endTag);
endTag,
skipValidate);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.Timeout;

@Timeout(60)
stevenzwu marked this conversation as resolved.
Show resolved Hide resolved
public class TestStreamScanSql extends CatalogTestBase {
private static final String TABLE = "test_table";
private static final FileFormat FORMAT = FileFormat.PARQUET;
Expand Down








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/iceberg/pull/10207/commits/6d03fa601fc24e1f98a3a26d213e8492bd00e2b3

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy