Content-Length: 794735 | pFad | https://github.com/apache/iceberg/pull/6965

F0 Spark 3.3: Support write to branch through table identifier by jackye1995 · Pull Request #6965 · apache/iceberg · GitHub
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.3: Support write to branch through table identifier #6965

Merged
merged 9 commits into from
Mar 11, 2023

Conversation

jackye1995
Copy link
Contributor

@jackye1995 jackye1995 commented Mar 1, 2023

continue based on discussions in #6651

Co-authored-by: @namrathamyske
Co-authored-by: @amogh-jahagirdar

@github-actions github-actions bot added the spark label Mar 1, 2023
@jackye1995 jackye1995 requested review from aokolnychyi and rdblue March 1, 2023 07:43
@jackye1995
Copy link
Contributor Author

@amogh-jahagirdar @namrathamyske @rdblue @aokolnychyi

I am still adding all the tests, but want to first publish the impl if there is any early feedback.

@aokolnychyi
Copy link
Contributor

I can get to this PR on Monday.

@jackye1995 jackye1995 force-pushed the branch-write branch 2 times, most recently from 39968a3 to 7abb619 Compare March 3, 2023 16:39
}

@Test
public void testMergeIntoEmptyTargetInsertOnlyMatchingRows() {
Assume.assumeTrue("Branch can only be used for non-empty table", branch == null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into these test assumptions made me think more about what should be valid.

Currently, all of the test cases use a branch for all reads and writes, so there isn't anything testing implicit branching from the current table state. That is when writing, we will automatically create a branch from the current table state if it does not already exist. That was intended for convenience, so you can set an environment branch and auto-create on write.

For commands like MERGE that use the current table state, I'm wondering if that makes any sense because the command needs to create a read for the branch's state, which doesn't yet exist.

I'm leaning toward not allowing that for MERGE, UPDATE, or DELETE because it is so strange. Does that mean we shouldn't allow implicit branch creation for INSERT either? On one hand, that's what these tests rely on to write from the start into a branch. But on the other, there could be an expectation that writing to a branch that doesn't exist actually creates it from an empty state.

So...

  1. I think we need tests for all of the operations that validate what happens when (a) the table is empty and an operation uses a branch that doesn't exist and (b) when the table is not empty and an operation uses a branch that doesn't exist
  2. I think that MERGE, UPDATE, and DELETE should fail right away with a "Branch does not exist: %s" error (currently this is an NPE in job planning)
  3. I think INSERT behavior should be clearly defined for branches, whether it is based on the current table state or an empty table state.

@aokolnychyi, @jackye1995, @danielcweeks, @amogh-jahagirdar, @namrathamyske, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry had some errands and was not able to push everything during the weekend.

Yes this is a point I also want to discuss. I used Assume to basically highlight this user experience explicit, glad we are having the same attentions to this issue here. I left the NPE there because this requires further discussion anyway. I can fix once we settle.

when writing, we will automatically create a branch from the current table state if it does not already exist. That was intended for convenience, so you can set an environment branch and auto-create on write.

I'm leaning toward not allowing that for MERGE, UPDATE, or DELETE because it is so strange

I also agree it's strange to create a new branch for operations like UPDATE and DELETE. However, if I think from the perspective of branching being an improved version of WAP, then it feels less strange because we are just saying the result of a write is staged in the specific branch until published.

The difference is that, in the WAP case, it won't explicitly say DELETE FROM table.branch_xxx and the branch is supposed to be hidden as an environment setting. So feels like this goes back to the argument about using identifier vs using an environment branch. If we consider them to be equivalent, then my logic would be: because it makes sense to auto-create the branch for the WAP case, we need to match the experience for the identifier case even though it feels a bit awkward from SQL language perspective.

This means we also support cases like testMergeIntoEmptyTarget for all branches. And extended further, following this logic, we will probably have to define a non-existing branch to always be at the current table state. So if user does SELECT * from table.branch_not_exist it will just read the current table instead of saying branch not found.

I think what Ryan is saying is following the opposite side of the logic that the WAP case and identifier case are not equivalent. My only concern would be it that would reduce a lot of the usability of branch, especially in the current situation that we consider the environment setting based approach to be difficult to achieve. At the same time, I am not sure if we really gain more consistency and clarity in this approach because there are still a lot of rules people need to understand and they might just get equally confused.

If we are saying special identifiers like table.branch_, table.snapshot_id_, etc. are hacks to work around query engine limitations, then it feels to me that we can accept some awkwardness to make it work for more use cases.

Would really like to know what others think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree it's strange to create a new branch for operations like UPDATE and DELETE. However, if I think from the perspective of branching being an improved version of WAP, then it feels less strange because we are just saying the result of a write is staged in the specific branch until published.

I think we do want to use this for an improved version of WAP. But do we necessarily need to have implicit branch creation everywhere? I don't think that's required, and if we decide to add it we can always do it later.

The difference is that, in the WAP case, it won't explicitly say DELETE FROM table.branch_xxx and the branch is supposed to be hidden as an environment setting

That's how WAP currently works, but we don't necessarily need branches to behave that way. We could create a spark.wap.branch setting that adds implicit branching and also ensures that all reads use that branch if it exists. That's well-defined behavior for an environment setting.

I'm more skeptical of implicit branch creation when performing normal branch writes using the table.branch_x naming. If you're specifically writing to a branch by name then it seems like it should exist before that point. If we require it to exist, then we avoid needing a behavior decision about how to initialize the state of an implicitly created branch.

So if user does SELECT * from table.branch_not_exist it will just read the current table instead of saying branch not found.

I see your logic and I think that this follows, but I also don't think it is a good idea to define the state of a non-existent branch to be the current table state. Again, I think we could introduce an environment state that makes this the behavior specifically for WAP, but that shouldn't be the default for all branches and doesn't need to be added right now.

The most natural consequence of reading a branch that doesn't exist is to get an error. Otherwise, we get confusing behavior. For example, if you tell me you'll create a branch for me to look at in a table but I look at it before you prepare the data, I still get results. It also opens up problems when a name is slightly off; say I want to read tag q4_22_audit but I actually read q4_2022_audit and get the current table state because the branch/tag doesn't exist. That would be really confusing for users.

I'm all for having ways to make WAP better using branches, but when we follow this line of reasoning it leads to confusing behavior in the more common cases. So I think we should make branches work in simple cases and then add more complex features on top.

I don't think this severely limits the utility of branches, since we can go through and add WAP on top as a next step.

@@ -252,7 +259,7 @@ protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
private final InternalRowWrapper asStructLike;

SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter counter) {
super(filePath, deletes, table.schema(), expectedSchema, counter);
super(filePath, deletes, SnapshotUtil.snapshotSchema(table, branch), expectedSchema, counter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a bit of exploration to find out if we could avoid passing branch all the way through the many classes to get the schema here. We could pass the table schema instead, but that doesn't seem worth the trouble since we still have to modify so many classes.

Another option is to add any missing columns to the read schema, so that it doesn't need to happen in each task. The main issue with that is that the columns need to be based on the delete files, so we would need to plan the scan before we could return the read schema.

I think that what this already does is actually the cleanest solution, but I thought I'd mention that I looked into it in case other people were wondering about all the changes to pass branch.

public SparkTable(Table icebergTable, String branch, boolean refreshEagerly) {
this(icebergTable, refreshEagerly);
this.branch = branch;
ValidationException.check(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read and write to non-existing branch are directly blocked here.

/**
* Return the schema of the snapshot at a given branch.
*
* <p>If branch does not exist, the table schema is returned because it will be the schema when
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here for both finding schema and latest snapshot allows branch to be non existing. The reason I decided to do this way is because the core library will still allow auto-creation of branch, so it makes more sense to support that case for these util methods. We only block writing to non-existing branch through table identifier in Spark module, but we will support other cases like WAP branch that will leverage the core feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. We should block at the write, not in the helper methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me too.

@@ -74,14 +75,19 @@ public void testCommitUnknownException() {

// write unpartitioned files
append(tableName, "{ \"id\": 1, \"dep\": \"hr\", \"category\": \"c1\"}");
createBranch();
Copy link
Contributor Author

@jackye1995 jackye1995 Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that we do not allow auto-creation of branches during write, all the tests are updated in such a way that the first write is to the table, and then create branch is called to make the branch from main.

}

@Test
public void testMergeRefreshesRelationCache() {
createAndInitTable("id INT, name STRING", "{ \"id\": 1, \"name\": \"n1\" }");
createOrReplaceView("source", "{ \"id\": 1, \"name\": \"n2\" }");

Dataset<Row> query = spark.sql("SELECT name FROM " + tableName);
Dataset<Row> query = spark.sql("SELECT name FROM " + commitTarget());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the cache refresh tests are using commitTarget for read instead of selectTarget, because I notice when using AS OF syntax it does not consider the relation to be the same and it will not refresh cache unless REFRESH TABLE is called explicitly. I will continue to look into this issue for a fix, but I think that can be a separated PR.

assertEquals("Should correctly add the non-matching rows", expectedRows, result);
}

@Test
public void testMergeEmptyTable() {
Assume.assumeFalse("Custom branch does not exist for empty table", "test".equals(branch));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will not test custom branch for empty table cases, but main will work as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can always add this later. I'm good with not allowing branches for now.

@jackye1995
Copy link
Contributor Author

@aokolnychyi thanks for the quick and detailed review! I replied the 2 questions you have, let me know what you think!

@@ -46,12 +47,14 @@ class SparkInputPartition implements InputPartition, HasPartitionKey, Serializab
Types.StructType groupingKeyType,
ScanTaskGroup<?> taskGroup,
Broadcast<Table> tableBroadcast,
String branch,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input partition could also take the table schema instead of branch, but it will cause additional overhead for serialization, so I kept this to still pass in branch.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through the changes one more time. Everything seems to be in order. It is a big change, easy to miss something. We can fix things once discovered but I feel this is the right approach overall.

@aokolnychyi
Copy link
Contributor

@rdblue, do you want to take another look?

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks good to me, thanks for carrying this forward @jackye1995 !

Preconditions.checkArgument(
snapshotId() == null, "Cannot override ref, already set snapshot id=%s", snapshotId());
Snapshot snapshot = table().snapshot(name);
Preconditions.checkArgument(snapshot != null, "Cannot find ref %s", name);
return newRefinedScan(table(), tableSchema(), context().useSnapshotId(snapshot.snapshotId()));
TableScanContext newContext = context().useSnapshotId(snapshot.snapshotId());
return newRefinedScan(table(), SnapshotUtil.schemaFor(table(), name), newContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch with the schema.

}

@Test
public void testDeleteWithDynamicFileFiltering() throws NoSuchTableException {
createAndInitPartitionedTable();

append(new Employee(1, "hr"), new Employee(3, "hr"));
append(tableName, new Employee(1, "hr"), new Employee(3, "hr"));
createBranch();
Copy link
Contributor

@rdblue rdblue Mar 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth supporting createBranch when there's no table data? We could create an empty snapshot to make it work. Then createBranch could be included in createAndInit.

Also, since this doesn't necessarily create a branch I think createBranchIfNeeded() would be slightly better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the method name. Regarding creating an empty snapshot, I remember we had this discussion before and ended up not doing that. I cannot recall the reason anymore, maybe we can hash out this in a different thread. Let me open an issue.

@rdblue
Copy link
Contributor

rdblue commented Mar 11, 2023

@jackye1995, this looks great. It's a huge PR so there were quite a few minor things, but overall I don't see any blockers for getting this in. If you want to merge now and follow up with changes, go for it.

@jackye1995
Copy link
Contributor Author

I will go ahead to merge this PR since it touches many files and is very easy to conflict with other PRs. We can address remaining comments in #7050. Thanks everyone for the review! @rdblue @aokolnychyi @amogh-jahagirdar @namrathamyske

@jackye1995 jackye1995 merged commit ba2a215 into apache:master Mar 11, 2023
krvikash pushed a commit to krvikash/iceberg that referenced this pull request Mar 16, 2023
)

Co-authored-by: Namratha Mysore Keshavaprakash <nmk344@gmail.com>
Co-authored-by: Amogh Jahagirdar <jahamogh@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/iceberg/pull/6965

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy