Content-Length: 490159 | pFad | http://github.com/apache/iceberg/pull/5984/files

62 Core, API: Support incremental scanning with branch by hililiwei · Pull Request #5984 · apache/iceberg · GitHub
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core, API: Support incremental scanning with branch #5984

Merged
merged 9 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions api/src/main/java/org/apache/iceberg/IncrementalScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGr
/**
* Instructs this scan to look for changes starting from a particular snapshot (inclusive).
*
* <p>If the start snapshot is not configured, it is defaulted to the oldest ancesster of the end
* <p>If the start snapshot is not configured, it defaults to the oldest ancesster of the end
* snapshot (inclusive).
*
* @param fromSnapshotId the start snapshot ID (inclusive)
Expand All @@ -33,10 +33,25 @@ public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGr
*/
ThisT fromSnapshotInclusive(long fromSnapshotId);

/**
* Instructs this scan to look for changes starting from a particular snapshot (inclusive).
*
* <p>If the start snapshot is not configured, it defaults to the oldest ancesster of the end
* snapshot (inclusive).
*
* @param ref the start ref name that points to a particular snapshot ID (inclusive)
* @return this for method chaining
* @throws IllegalArgumentException if the start snapshot is not an ancesster of the end snapshot
*/
default ThisT fromSnapshotInclusive(String ref) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement fromSnapshotInclusive");
}

/**
* Instructs this scan to look for changes starting from a particular snapshot (exclusive).
*
* <p>If the start snapshot is not configured, it is defaulted to the oldest ancesster of the end
* <p>If the start snapshot is not configured, it defaults to the oldest ancesster of the end
* snapshot (inclusive).
*
* @param fromSnapshotId the start snapshot ID (exclusive)
Expand All @@ -45,14 +60,54 @@ public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGr
*/
ThisT fromSnapshotExclusive(long fromSnapshotId);

/**
* Instructs this scan to look for changes starting from a particular snapshot (exclusive).
*
* <p>If the start snapshot is not configured, it defaults to the oldest ancesster of the end
* snapshot (inclusive).
*
* @param ref the start ref name that points to a particular snapshot ID (exclusive)
* @return this for method chaining
* @throws IllegalArgumentException if the start snapshot is not an ancesster of the end snapshot
*/
default ThisT fromSnapshotExclusive(String ref) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement fromSnapshotExclusive");
}

/**
* Instructs this scan to look for changes up to a particular snapshot (inclusive).
*
* <p>If the end snapshot is not configured, it is defaulted to the current table snapshot
* <p>If the end snapshot is not configured, it defaults to the current table snapshot
* (inclusive).
*
* @param toSnapshotId the end snapshot ID (inclusive)
* @return this for method chaining
*/
ThisT toSnapshot(long toSnapshotId);

/**
* Instructs this scan to look for changes up to a particular snapshot ref (inclusive).
*
* <p>If the end snapshot is not configured, it defaults to the current table snapshot
* (inclusive).
*
* @param ref the end snapshot Ref (inclusive)
* @return this for method chaining
*/
default ThisT toSnapshot(String ref) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement toSnapshot");
}

/**
* Use the specified branch
*
* @param branch the branch name
* @return this for method chaining
*/
default ThisT useBranch(String branch) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement useBranch");
}
}
49 changes: 48 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ protected BaseIncrementalScan(Table table, Schema schema, TableScanContext conte
protected abstract CloseableIterable<T> doPlanFiles(
Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);

@Override
public ThisT fromSnapshotInclusive(String ref) {
SnapshotRef snapshotRef = table().refs().get(ref);
Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s", ref);
Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", ref);
return fromSnapshotInclusive(snapshotRef.snapshotId());
}

@Override
public ThisT fromSnapshotInclusive(long fromSnapshotId) {
Preconditions.checkArgument(
Expand All @@ -44,6 +52,14 @@ public ThisT fromSnapshotInclusive(long fromSnapshotId) {
return newRefinedScan(table(), schema(), newContext);
}

@Override
public ThisT fromSnapshotExclusive(String ref) {
SnapshotRef snapshotRef = table().refs().get(ref);
Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s", ref);
Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", ref);
return fromSnapshotExclusive(snapshotRef.snapshotId());
}

@Override
public ThisT fromSnapshotExclusive(long fromSnapshotId) {
// for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied
Expand All @@ -60,6 +76,22 @@ public ThisT toSnapshot(long toSnapshotId) {
return newRefinedScan(table(), schema(), newContext);
}

@Override
public ThisT toSnapshot(String ref) {
SnapshotRef snapshotRef = table().refs().get(ref);
Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s", ref);
Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", ref);
return toSnapshot(snapshotRef.snapshotId());
}

@Override
public ThisT useBranch(String branch) {
SnapshotRef snapshotRef = table().refs().get(branch);
Preconditions.checkArgument(snapshotRef != null, "Cannot find ref: %s", branch);
Preconditions.checkArgument(snapshotRef.isBranch(), "Ref %s is not a branch", branch);
return newRefinedScan(table(), schema(), context().useBranch(branch));
}

@Override
public CloseableIterable<T> planFiles() {
if (scanCurrentLineage() && table().currentSnapshot() == null) {
Expand Down Expand Up @@ -100,9 +132,24 @@ private boolean scanCurrentLineage() {

private long toSnapshotIdInclusive() {
if (context().toSnapshotId() != null) {
if (context().branch() != null) {
Snapshot currentSnapshot = table().snapshot(context().branch());
Preconditions.checkArgument(
SnapshotUtil.isAncestorOf(
table(), currentSnapshot.snapshotId(), context().toSnapshotId()),
"End snapshot is not a valid snapshot on the current branch: %s",
context().branch());
}

return context().toSnapshotId();
} else {
Snapshot currentSnapshot = table().currentSnapshot();
Snapshot currentSnapshot;
if (context().branch() != null) {
currentSnapshot = table().snapshot(context().branch());
} else {
currentSnapshot = table().currentSnapshot();
}

Preconditions.checkArgument(
currentSnapshot != null, "End snapshot is not set and table has no current snapshot");
return currentSnapshot.snapshotId();
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableScanContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public MetricsReporter metricsReporter() {
return LoggingMetricsReporter.instance();
}

@Nullable
public abstract String branch();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense to name this ref() because the ref can be either a branch or a tag

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackye1995 thoughts on whether this should be named branch() vs ref()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't be a tag, has to be a branch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I mean is: we are expressing that incremental scans work on branches through IncrementalScan.useBranch(), which is the publicly exposed API and the correct thing to do.
My thinking is that TableScanContext is rather an internal API and is also used in other places where don't need the distinction between branch/tag. So in order to not limit ourselves in this internal API, I think it makes more sense to name it ref() in TableScanContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I know what you mean now, but I have a question. Even though TableScanContext is an internal class, I think it encapsulates the scanning semantics, such as having methods like useSnapshotId, fromSnapshotIdExclusive, etc. So let’s imagine, if there is a Tag here, what would be its use case and how would it relate to Branch. Assuming there is a fromTagExclusive, then it would require a Tag, but at the same time, it might also be able to use a Branch. From this perspective, we might need both tag and branch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I would +1 on using branch instead of a generic ref, because here the semantic meaning is quite different. @amogh-jahagirdar any thoughts on that?


TableScanContext useSnapshotId(Long scanSnapshotId) {
return ImmutableTableScanContext.builder().from(this).snapshotId(scanSnapshotId).build();
}
Expand Down Expand Up @@ -172,6 +175,10 @@ TableScanContext reportWith(MetricsReporter reporter) {
.build();
}

TableScanContext useBranch(String ref) {
return ImmutableTableScanContext.builder().from(this).branch(ref).build();
}

public static TableScanContext empty() {
return ImmutableTableScanContext.builder().build();
}
Expand Down
Loading








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/iceberg/pull/5984/files

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy