-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.3: Support write to branch through table identifier #6965
Conversation
@amogh-jahagirdar @namrathamyske @rdblue @aokolnychyi I am still adding all the tests, but want to first publish the impl if there is any early feedback. |
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
Show resolved
Hide resolved
I can get to this PR on Monday. |
39968a3
to
7abb619
Compare
...sions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
public void testMergeIntoEmptyTargetInsertOnlyMatchingRows() { | ||
Assume.assumeTrue("Branch can only be used for non-empty table", branch == null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking into these test assumptions made me think more about what should be valid.
Currently, all of the test cases use a branch for all reads and writes, so there isn't anything testing implicit branching from the current table state. That is when writing, we will automatically create a branch from the current table state if it does not already exist. That was intended for convenience, so you can set an environment branch and auto-create on write.
For commands like MERGE that use the current table state, I'm wondering if that makes any sense because the command needs to create a read for the branch's state, which doesn't yet exist.
I'm leaning toward not allowing that for MERGE, UPDATE, or DELETE because it is so strange. Does that mean we shouldn't allow implicit branch creation for INSERT either? On one hand, that's what these tests rely on to write from the start into a branch. But on the other, there could be an expectation that writing to a branch that doesn't exist actually creates it from an empty state.
So...
- I think we need tests for all of the operations that validate what happens when (a) the table is empty and an operation uses a branch that doesn't exist and (b) when the table is not empty and an operation uses a branch that doesn't exist
- I think that MERGE, UPDATE, and DELETE should fail right away with a "Branch does not exist: %s" error (currently this is an NPE in job planning)
- I think INSERT behavior should be clearly defined for branches, whether it is based on the current table state or an empty table state.
@aokolnychyi, @jackye1995, @danielcweeks, @amogh-jahagirdar, @namrathamyske, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry had some errands and was not able to push everything during the weekend.
Yes this is a point I also want to discuss. I used Assume
to basically highlight this user experience explicit, glad we are having the same attentions to this issue here. I left the NPE there because this requires further discussion anyway. I can fix once we settle.
when writing, we will automatically create a branch from the current table state if it does not already exist. That was intended for convenience, so you can set an environment branch and auto-create on write.
I'm leaning toward not allowing that for MERGE, UPDATE, or DELETE because it is so strange
I also agree it's strange to create a new branch for operations like UPDATE and DELETE. However, if I think from the perspective of branching being an improved version of WAP, then it feels less strange because we are just saying the result of a write is staged in the specific branch until published.
The difference is that, in the WAP case, it won't explicitly say DELETE FROM table.branch_xxx
and the branch is supposed to be hidden as an environment setting. So feels like this goes back to the argument about using identifier vs using an environment branch. If we consider them to be equivalent, then my logic would be: because it makes sense to auto-create the branch for the WAP case, we need to match the experience for the identifier case even though it feels a bit awkward from SQL language perspective.
This means we also support cases like testMergeIntoEmptyTarget
for all branches. And extended further, following this logic, we will probably have to define a non-existing branch to always be at the current table state. So if user does SELECT * from table.branch_not_exist
it will just read the current table instead of saying branch not found.
I think what Ryan is saying is following the opposite side of the logic that the WAP case and identifier case are not equivalent. My only concern would be it that would reduce a lot of the usability of branch, especially in the current situation that we consider the environment setting based approach to be difficult to achieve. At the same time, I am not sure if we really gain more consistency and clarity in this approach because there are still a lot of rules people need to understand and they might just get equally confused.
If we are saying special identifiers like table.branch_
, table.snapshot_id_
, etc. are hacks to work around query engine limitations, then it feels to me that we can accept some awkwardness to make it work for more use cases.
Would really like to know what others think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also agree it's strange to create a new branch for operations like UPDATE and DELETE. However, if I think from the perspective of branching being an improved version of WAP, then it feels less strange because we are just saying the result of a write is staged in the specific branch until published.
I think we do want to use this for an improved version of WAP. But do we necessarily need to have implicit branch creation everywhere? I don't think that's required, and if we decide to add it we can always do it later.
The difference is that, in the WAP case, it won't explicitly say DELETE FROM table.branch_xxx and the branch is supposed to be hidden as an environment setting
That's how WAP currently works, but we don't necessarily need branches to behave that way. We could create a spark.wap.branch
setting that adds implicit branching and also ensures that all reads use that branch if it exists. That's well-defined behavior for an environment setting.
I'm more skeptical of implicit branch creation when performing normal branch writes using the table.branch_x
naming. If you're specifically writing to a branch by name then it seems like it should exist before that point. If we require it to exist, then we avoid needing a behavior decision about how to initialize the state of an implicitly created branch.
So if user does SELECT * from table.branch_not_exist it will just read the current table instead of saying branch not found.
I see your logic and I think that this follows, but I also don't think it is a good idea to define the state of a non-existent branch to be the current table state. Again, I think we could introduce an environment state that makes this the behavior specifically for WAP, but that shouldn't be the default for all branches and doesn't need to be added right now.
The most natural consequence of reading a branch that doesn't exist is to get an error. Otherwise, we get confusing behavior. For example, if you tell me you'll create a branch for me to look at in a table but I look at it before you prepare the data, I still get results. It also opens up problems when a name is slightly off; say I want to read tag q4_22_audit
but I actually read q4_2022_audit
and get the current table state because the branch/tag doesn't exist. That would be really confusing for users.
I'm all for having ways to make WAP better using branches, but when we follow this line of reasoning it leads to confusing behavior in the more common cases. So I think we should make branches work in simple cases and then add more complex features on top.
I don't think this severely limits the utility of branches, since we can go through and add WAP on top as a next step.
...park-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
Outdated
Show resolved
Hide resolved
@@ -252,7 +259,7 @@ protected class SparkDeleteFilter extends DeleteFilter<InternalRow> { | |||
private final InternalRowWrapper asStructLike; | |||
|
|||
SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter counter) { | |||
super(filePath, deletes, table.schema(), expectedSchema, counter); | |||
super(filePath, deletes, SnapshotUtil.snapshotSchema(table, branch), expectedSchema, counter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a bit of exploration to find out if we could avoid passing branch all the way through the many classes to get the schema here. We could pass the table schema instead, but that doesn't seem worth the trouble since we still have to modify so many classes.
Another option is to add any missing columns to the read schema, so that it doesn't need to happen in each task. The main issue with that is that the columns need to be based on the delete files, so we would need to plan the scan before we could return the read schema.
I think that what this already does is actually the cleanest solution, but I thought I'd mention that I looked into it in case other people were wondering about all the changes to pass branch
.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteOperation.java
Outdated
Show resolved
Hide resolved
public SparkTable(Table icebergTable, String branch, boolean refreshEagerly) { | ||
this(icebergTable, refreshEagerly); | ||
this.branch = branch; | ||
ValidationException.check( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read and write to non-existing branch are directly blocked here.
/** | ||
* Return the schema of the snapshot at a given branch. | ||
* | ||
* <p>If branch does not exist, the table schema is returned because it will be the schema when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here for both finding schema and latest snapshot allows branch to be non existing. The reason I decided to do this way is because the core library will still allow auto-creation of branch, so it makes more sense to support that case for these util methods. We only block writing to non-existing branch through table identifier in Spark module, but we will support other cases like WAP branch that will leverage the core feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. We should block at the write, not in the helper methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me too.
@@ -74,14 +75,19 @@ public void testCommitUnknownException() { | |||
|
|||
// write unpartitioned files | |||
append(tableName, "{ \"id\": 1, \"dep\": \"hr\", \"category\": \"c1\"}"); | |||
createBranch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that we do not allow auto-creation of branches during write, all the tests are updated in such a way that the first write is to the table, and then create branch is called to make the branch from main.
} | ||
|
||
@Test | ||
public void testMergeRefreshesRelationCache() { | ||
createAndInitTable("id INT, name STRING", "{ \"id\": 1, \"name\": \"n1\" }"); | ||
createOrReplaceView("source", "{ \"id\": 1, \"name\": \"n2\" }"); | ||
|
||
Dataset<Row> query = spark.sql("SELECT name FROM " + tableName); | ||
Dataset<Row> query = spark.sql("SELECT name FROM " + commitTarget()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the cache refresh tests are using commitTarget
for read instead of selectTarget
, because I notice when using AS OF
syntax it does not consider the relation to be the same and it will not refresh cache unless REFRESH TABLE
is called explicitly. I will continue to look into this issue for a fix, but I think that can be a separated PR.
assertEquals("Should correctly add the non-matching rows", expectedRows, result); | ||
} | ||
|
||
@Test | ||
public void testMergeEmptyTable() { | ||
Assume.assumeFalse("Custom branch does not exist for empty table", "test".equals(branch)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will not test custom branch for empty table cases, but main will work as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can always add this later. I'm good with not allowing branches for now.
@aokolnychyi thanks for the quick and detailed review! I replied the 2 questions you have, let me know what you think! |
@@ -46,12 +47,14 @@ class SparkInputPartition implements InputPartition, HasPartitionKey, Serializab | |||
Types.StructType groupingKeyType, | |||
ScanTaskGroup<?> taskGroup, | |||
Broadcast<Table> tableBroadcast, | |||
String branch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The input partition could also take the table schema instead of branch, but it will cause additional overhead for serialization, so I kept this to still pass in branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through the changes one more time. Everything seems to be in order. It is a big change, easy to miss something. We can fix things once discovered but I feel this is the right approach overall.
@rdblue, do you want to take another look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks good to me, thanks for carrying this forward @jackye1995 !
Preconditions.checkArgument( | ||
snapshotId() == null, "Cannot override ref, already set snapshot id=%s", snapshotId()); | ||
Snapshot snapshot = table().snapshot(name); | ||
Preconditions.checkArgument(snapshot != null, "Cannot find ref %s", name); | ||
return newRefinedScan(table(), tableSchema(), context().useSnapshotId(snapshot.snapshotId())); | ||
TableScanContext newContext = context().useSnapshotId(snapshot.snapshotId()); | ||
return newRefinedScan(table(), SnapshotUtil.schemaFor(table(), name), newContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch with the schema.
...park-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
Outdated
Show resolved
Hide resolved
...spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
Outdated
Show resolved
Hide resolved
...park-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
Show resolved
Hide resolved
...park-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
public void testDeleteWithDynamicFileFiltering() throws NoSuchTableException { | ||
createAndInitPartitionedTable(); | ||
|
||
append(new Employee(1, "hr"), new Employee(3, "hr")); | ||
append(tableName, new Employee(1, "hr"), new Employee(3, "hr")); | ||
createBranch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth supporting createBranch
when there's no table data? We could create an empty snapshot to make it work. Then createBranch
could be included in createAndInit
.
Also, since this doesn't necessarily create a branch I think createBranchIfNeeded()
would be slightly better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the method name. Regarding creating an empty snapshot, I remember we had this discussion before and ended up not doing that. I cannot recall the reason anymore, maybe we can hash out this in a different thread. Let me open an issue.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Show resolved
Hide resolved
@jackye1995, this looks great. It's a huge PR so there were quite a few minor things, but overall I don't see any blockers for getting this in. If you want to merge now and follow up with changes, go for it. |
I will go ahead to merge this PR since it touches many files and is very easy to conflict with other PRs. We can address remaining comments in #7050. Thanks everyone for the review! @rdblue @aokolnychyi @amogh-jahagirdar @namrathamyske |
continue based on discussions in #6651
Co-authored-by: @namrathamyske
Co-authored-by: @amogh-jahagirdar