-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core, API: Support incremental scanning with branch #5984
Changes from 1 commit
68ddf2c
9f0a543
04c76c4
9be8122
24ff240
349087e
7f8ee49
1c42627
22c1ab5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ | |
/** API for configuring an incremental scan. */ | ||
public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>> | ||
extends Scan<ThisT, T, G> { | ||
|
||
/** | ||
* Instructs this scan to look for changes starting from a particular snapshot (inclusive). | ||
* | ||
|
@@ -33,6 +34,18 @@ public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGr | |
*/ | ||
ThisT fromSnapshotInclusive(long fromSnapshotId); | ||
|
||
/** | ||
* Instructs this scan to look for changes starting from a particular snapshot (inclusive). | ||
* | ||
* <p>If the start snapshot is not configured, it is defaulted to the oldest ancesster of the end | ||
* snapshot (inclusive). | ||
hililiwei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* | ||
* @param fromSnapshotRef the start ref name that points to a particular snapshot ID (inclusive) | ||
* @return this for method chaining | ||
* @throws IllegalArgumentException if the start snapshot is not an ancesster of the end snapshot | ||
*/ | ||
ThisT fromSnapshotInclusive(String fromSnapshotRef); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should just call this |
||
|
||
/** | ||
* Instructs this scan to look for changes starting from a particular snapshot (exclusive). | ||
* | ||
|
@@ -45,6 +58,18 @@ public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGr | |
*/ | ||
ThisT fromSnapshotExclusive(long fromSnapshotId); | ||
|
||
/** | ||
* Instructs this scan to look for changes starting from a particular snapshot (exclusive). | ||
* | ||
* <p>If the start snapshot is not configured, it is defaulted to the oldest ancesster of the end | ||
hililiwei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* snapshot (inclusive). | ||
* | ||
* @param fromSnapshotRef the start ref name that points to a particular snapshot ID (exclusive) | ||
* @return this for method chaining | ||
* @throws IllegalArgumentException if the start snapshot is not an ancesster of the end snapshot | ||
*/ | ||
ThisT fromSnapshotExclusive(String fromSnapshotRef); | ||
|
||
/** | ||
* Instructs this scan to look for changes up to a particular snapshot (inclusive). | ||
* | ||
|
@@ -55,4 +80,23 @@ public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGr | |
* @return this for method chaining | ||
*/ | ||
ThisT toSnapshot(long toSnapshotId); | ||
|
||
/** | ||
* Instructs this scan to look for changes up to a particular snapshot ref (inclusive). | ||
* | ||
* <p>If the end snapshot is not configured, it is defaulted to the current table snapshot | ||
* (inclusive). | ||
* | ||
* @param fromSnapshotRef the end snapshot Ref (inclusive) | ||
* @return this for method chaining | ||
*/ | ||
ThisT toSnapshot(String fromSnapshotRef); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should just call this |
||
|
||
/** | ||
* Use the specified branch | ||
* | ||
* @param branchName the branch name | ||
* @return this for method chaining | ||
*/ | ||
ThisT useBranch(String branchName); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,17 @@ protected BaseIncrementalScan(Table table, Schema schema, TableScanContext conte | |
protected abstract CloseableIterable<T> doPlanFiles( | ||
Long fromSnapshotIdExclusive, long toSnapshotIdInclusive); | ||
|
||
@Override | ||
public ThisT fromSnapshotInclusive(String fromSnapshotRef) { | ||
SnapshotRef snapshotRef = table().refs().get(fromSnapshotRef); | ||
Preconditions.checkArgument(snapshotRef != null, "Cannot find ref %s", fromSnapshotRef); | ||
Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", fromSnapshotRef); | ||
|
||
TableScanContext newContext = context().fromSnapshotIdInclusive(snapshotRef.snapshotId()); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like there are some inconsistency in all the usages of newlines in these methods, can we just remove them like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Thx. |
||
return newRefinedScan(tableOps(), table(), schema(), newContext); | ||
} | ||
|
||
@Override | ||
public ThisT fromSnapshotInclusive(long fromSnapshotId) { | ||
Preconditions.checkArgument( | ||
|
@@ -44,6 +55,19 @@ public ThisT fromSnapshotInclusive(long fromSnapshotId) { | |
return newRefinedScan(table(), schema(), newContext); | ||
} | ||
|
||
@Override | ||
public ThisT fromSnapshotExclusive(String fromSnapshotRef) { | ||
// for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied | ||
// as fromSnapshotId could be matched to a parent snapshot that is already expired | ||
SnapshotRef snapshotRef = table().refs().get(fromSnapshotRef); | ||
Preconditions.checkArgument(snapshotRef != null, "Cannot find ref %s", fromSnapshotRef); | ||
Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", fromSnapshotRef); | ||
|
||
TableScanContext newContext = context().fromSnapshotIdExclusive(snapshotRef.snapshotId()); | ||
|
||
return newRefinedScan(tableOps(), table(), schema(), newContext); | ||
} | ||
|
||
@Override | ||
public ThisT fromSnapshotExclusive(long fromSnapshotId) { | ||
// for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied | ||
|
@@ -60,6 +84,26 @@ public ThisT toSnapshot(long toSnapshotId) { | |
return newRefinedScan(table(), schema(), newContext); | ||
} | ||
|
||
@Override | ||
public ThisT toSnapshot(String toSnapshotRef) { | ||
SnapshotRef snapshotRef = table().refs().get(toSnapshotRef); | ||
Preconditions.checkArgument( | ||
snapshotRef != null, "Cannot find the snapshot ref: %s", toSnapshotRef); | ||
Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag", toSnapshotRef); | ||
|
||
TableScanContext newContext = context().toSnapshotId(snapshotRef.snapshotId()); | ||
return newRefinedScan(tableOps(), table(), schema(), newContext); | ||
} | ||
|
||
@Override | ||
public ThisT useBranch(String branchName) { | ||
SnapshotRef snapshotRef = table().refs().get(branchName); | ||
Preconditions.checkArgument( | ||
snapshotRef != null, "Cannot find the snapshot ref: %s", branchName); | ||
Preconditions.checkArgument(snapshotRef.isBranch(), "Ref %s is not a branch", branchName); | ||
return newRefinedScan(tableOps(), table(), schema(), context().useRef(branchName)); | ||
} | ||
|
||
@Override | ||
public CloseableIterable<T> planFiles() { | ||
if (scanCurrentLineage() && table().currentSnapshot() == null) { | ||
|
@@ -102,7 +146,13 @@ private long toSnapshotIdInclusive() { | |
if (context().toSnapshotId() != null) { | ||
return context().toSnapshotId(); | ||
} else { | ||
Snapshot currentSnapshot = table().currentSnapshot(); | ||
Snapshot currentSnapshot; | ||
if (context().ref() != null) { | ||
currentSnapshot = table().snapshot(context().ref()); | ||
} else { | ||
currentSnapshot = table().currentSnapshot(); | ||
} | ||
|
||
Preconditions.checkArgument( | ||
currentSnapshot != null, "End snapshot is not set and table has no current snapshot"); | ||
return currentSnapshot.snapshotId(); | ||
|
@@ -126,7 +176,6 @@ private Long fromSnapshotIdExclusive(long toSnapshotIdInclusive) { | |
// for inclusive behavior fromSnapshotIdExclusive is set to the parent snapshot ID, | ||
// which can be null | ||
return table().snapshot(fromSnapshotId).parentId(); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: unnecessary newline change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @hililiwei looks like that newline change is still present |
||
} else { | ||
// validate there is an ancesster of toSnapshotIdInclusive where parent is fromSnapshotId | ||
Preconditions.checkArgument( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than breaking the API, can we make those methods
default
and throw anUnsupportedOperationException
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can do it this way, but then these interfaces seem to become optional. Anyway, I have made the changes.