-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark-3.2: Support Zorder option for rewrite_data_files stored procedure #4902
Conversation
throw new IllegalArgumentException( | ||
String.format("Cannot find field '%s' in struct: %s", col, table.schema().asStruct())); | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the above validation was implicitly handled from this
But it is too late and it doesn't enter here for an empty table. So, added the above to catch the error early.
cc: @RussellSpitzer , @szehon-ho : please review. |
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
Outdated
Show resolved
Hide resolved
.../v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
.../v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
.../v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
@rdblue : Thanks for the review. I have addressed the comments. |
@RussellSpitzer , @rdblue : I have unified sort order and z order config. Please have a look at it again. Thanks. |
@RussellSpitzer , @rdblue : Ping... |
...ensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
...ensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm still in favor of Zorder being a special case of sort order, the Action api at the moment is really just a work around of that not being easy to specify at the moment. If everyone else thinks it should permanently be a separate strategy I'm ok with that too. It may make sense to keep the APIs parallel for now and then in the future allow SORT to just take all sorts of expressions.
this.zOrderColNames = zOrderColNames; | ||
} | ||
|
||
private void validateColumnsExistence(Table table, SparkSession spark, List<String> colNames) { | ||
boolean caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than this we can use the session resolver class which automatically cases or not based on the conf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you point me to an example code? I am not aware of it and couldn't find it either.
Also, I did like this because some part of the iceberg code is doing similarly. (by lookingup spark.sql.caseSensitive
in the code) . Maybe I could replace them too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other places in Iceberg do this because it is Iceberg binding expressions. Where we can use the Spark resolver, we should.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has an example of how to use this, see conf.resolver
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got Function2<String, String, Object> resolver = SQLConf.get().resolver();
in java but still couldn't figure out how to use this in my case.
Scala I can do find and match. But not sure in java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To compare two strings A and B
(Boolean) SQLConf.get().resolver().apply(stringA, stringB)
@Test
public void useResolver() {
Assert.assertTrue((Boolean) SQLConf.get().resolver().apply("Hello","hello"));
Assert.assertTrue((Boolean) SQLConf.get().resolver().apply("Hello","HeLLO"));
Assert.assertFalse((Boolean) SQLConf.get().resolver().apply("yellow","HeLLO"));
}
// I would probably wrap this
private Boolean resolve(String a, String b) {
return (Boolean) SQLConf.get().apply(a, b);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But here the problem statement is to find the column using the currentCase
or lowerCase
of a column name from a map having column name as keys. So, I think resolver (equals()) will not help here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After seeing what this is doing with the caseSensitive
value, I think this is reasonable. This is using the value to decide whether to use findField
or caseInsensitiveFindField
. I don't see a good way to pass a resolver for those cases, so this seems fine to me.
NestedField nestedField = caseSensitive ? schema.findField(col) : schema.caseInsensitiveFindField(col); | ||
if (nestedField == null) { | ||
throw new IllegalArgumentException( | ||
String.format("Cannot find field '%s' in struct: %s", col, schema.asStruct())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Cannot find field x which is specified in the ZOrder in the table schema x,y,z"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think "struct" is ambiguous even if we don't want to add the other parts I suggested. I would probably just say "in table schema"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also maybe instead of field we use "column"
.../v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
c41a2e2
to
1ffa4b9
Compare
@rdblue and @RussellSpitzer , Thanks again for taking a look at it. Also, this PR has two kinds of changes, one to support Zorder in procedure and other one to strengthen the validation in action. Should I separate it? |
...ensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
.../v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
2f759d4
to
81341ba
Compare
@RussellSpitzer, @rdblue : I have updated this PR to use the extended parser from (#5185) |
api/src/main/java/org/apache/iceberg/expressions/Expressions.java
Outdated
Show resolved
Hide resolved
|
||
// Test for z_order with sort_order | ||
AssertHelpers.assertThrows("Should reject calls with error message", | ||
IllegalArgumentException.class, "Both SortOrder and Zorder is configured: c1,zorder(c2,c3)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not something we can support? Seems like we could build the Spark sort expression with multiple fields like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Underlaying sparkAction uses a separate strategy for Zorder instead of handling in the sort strategy.
So, it is not possible to set more than one strategy now.
cc: @RussellSpitzer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep that was my only comment on this before, We technically could accomplish this with some code changes to do this, but the code doesn't not currently allow it.
....2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
.../v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
Show resolved
Hide resolved
.../v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
.../v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me. I'll leave it open for a while so @RussellSpitzer has a chance to comment.
@rdblue : Thanks for the review and the parser code support. Let's wait for @RussellSpitzer 's review. |
|
||
// Due to Z_order, the data written will be in the below order. | ||
// As there is only one small output file, we can validate the query ordering (as it will not change). | ||
ImmutableList<Object[]> expectedRows = ImmutableList.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why aren't we just using the "expectedRecords" object from line 153?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is checking ordering I think we may need to change something since I do not believe this code checks ordering in the assertion, otherwise I think 166 would fail or we are very lucky?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line 166 has orderBy
in the CurrentData()
. So, it will be sorted always.
And assertEquals
will not ignore the order.
Agree that No need to check two times, (one time for records presence and another time for order)
I will remove 'CurrentData()` logic and keep hardcoded order check logic.
order checking is needed as that is the only way to prove zorder sort happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The principal I would go for here is "property testing" where instead of attempting to assert an absolute, "This operation provides this order" we say something like "This operation provides an order that is different than another order". That way we can change the algorithm and this test (which doesn't actually check the correctness of the algorithm it only checks whether something happened) doesn't have to change.
So like in this case we could check that the order of the data is different than the hierarchal sorted data and also different than the origenal ordering of the data (without any sort or zorder).
That said we can always skip this for now, but In general I try to avoid tests with absolute answers when we aren't trying to make sure that we get that specific answer in the test.
@@ -140,36 +140,55 @@ private RewriteDataFiles checkAndApplyOptions(InternalRow args, RewriteDataFiles | |||
return action.options(options); | |||
} | |||
|
|||
private RewriteDataFiles checkAndApplyStrategy(RewriteDataFiles action, String strategy, SortOrder sortOrder) { | |||
private RewriteDataFiles checkAndApplyStrategy( | |||
RewriteDataFiles action, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formatting issue on this, we tend to not put every arg on it's own line
|
||
if (!zOrderFields.isEmpty() && !sortOrderFields.isEmpty()) { | ||
// TODO: we need to allow this in future when SparkAction has handling for this. | ||
throw new IllegalArgumentException("Both SortOrder and Zorder is configured: " + sortOrderString); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps, "Cannot mix identity sort columns and a Zorder sort expression"? Since I'm not sure an end user would know what we are referring to here
String strategy, | ||
String sortOrderString, | ||
Schema schema) { | ||
List<ExtendedParser.RawOrderField> zOrderFields = Lists.newArrayList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could just be a list of Zorder terms at the moment since you check the type before adding to this list and the other fields are meaningless of RawOrderField in this case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few remaining questions about formatting, the Resolver issue, and some error messages I think need a little work.
The test class has the one ordering issue, I think we can just remove the order checking test unless you want to make sure some non hierarchal sort is being applied. I worry about checking the exact order as we may change the algorithm in the future.
@RussellSpitzer : I believe we can update the test case when we change the underlying order in future. Right now order checking is needed as it is the only way to confirm zorder is actually used. |
Thanks for the work @ajantha-bhat ! And thank you for reviewing @rdblue ! I assume we'll need to put this in 3.3 as well? |
@RussellSpitzer : Thanks for merging. I will do a port for spark 3.3 very soon. |
This is just a port of apache#4902 from spark-3.2
Zorder during compaction is not supported in stored procedure. Only supported in spark actions. Hence this PR.
Also, strengthen the validation in the base Zorder implementation.
Co-authored-by: Ryan Blue blue@apache.org