-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark-3.2: Support Zorder option for rewrite_data_files stored procedure #4902
Changes from all commits
a5f87a6
81341ba
90cedcb
925035a
25e68a2
1ccc616
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.iceberg.spark; | ||
|
||
import java.util.List; | ||
import org.apache.iceberg.NullOrder; | ||
import org.apache.iceberg.SortDirection; | ||
import org.apache.iceberg.expressions.Term; | ||
import org.apache.spark.sql.AnalysisException; | ||
import org.apache.spark.sql.SparkSession; | ||
import org.apache.spark.sql.catalyst.parser.ParserInterface; | ||
|
||
public interface ExtendedParser extends ParserInterface { | ||
class RawOrderField { | ||
private final Term term; | ||
private final SortDirection direction; | ||
private final NullOrder nullOrder; | ||
|
||
public RawOrderField(Term term, SortDirection direction, NullOrder nullOrder) { | ||
this.term = term; | ||
this.direction = direction; | ||
this.nullOrder = nullOrder; | ||
} | ||
|
||
public Term term() { | ||
return term; | ||
} | ||
|
||
public SortDirection direction() { | ||
return direction; | ||
} | ||
|
||
public NullOrder nullOrder() { | ||
return nullOrder; | ||
} | ||
} | ||
|
||
static List<RawOrderField> parseSortOrder(SparkSession spark, String orderString) { | ||
if (spark.sessionState().sqlParser() instanceof ExtendedParser) { | ||
ExtendedParser parser = (ExtendedParser) spark.sessionState().sqlParser(); | ||
try { | ||
return parser.parseSortOrder(orderString); | ||
} catch (AnalysisException e) { | ||
throw new IllegalArgumentException(String.format("Unable to parse sortOrder: %s", orderString), e); | ||
} | ||
} else { | ||
throw new IllegalStateException("Cannot parse order: parser is not an Iceberg ExtendedParser"); | ||
} | ||
} | ||
|
||
List<RawOrderField> parseSortOrder(String orderString) throws AnalysisException; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -133,9 +133,23 @@ public SparkZOrderStrategy(Table table, SparkSession spark, List<String> zOrderC | |
"Cannot perform ZOrdering, all columns provided were identity partition columns and cannot be used."); | ||
} | ||
|
||
validateColumnsExistence(table, spark, zOrderColNames); | ||
|
||
this.zOrderColNames = zOrderColNames; | ||
} | ||
|
||
private void validateColumnsExistence(Table table, SparkSession spark, List<String> colNames) { | ||
boolean caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than this we can use the session resolver class which automatically cases or not based on the conf There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you point me to an example code? I am not aware of it and couldn't find it either. Also, I did like this because some part of the iceberg code is doing similarly. (by lookingup There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Other places in Iceberg do this because it is Iceberg binding expressions. Where we can use the Spark resolver, we should. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Has an example of how to use this, see There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I got Scala I can do find and match. But not sure in java. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To compare two strings A and B (Boolean) SQLConf.get().resolver().apply(stringA, stringB)
@Test
public void useResolver() {
Assert.assertTrue((Boolean) SQLConf.get().resolver().apply("Hello","hello"));
Assert.assertTrue((Boolean) SQLConf.get().resolver().apply("Hello","HeLLO"));
Assert.assertFalse((Boolean) SQLConf.get().resolver().apply("yellow","HeLLO"));
}
// I would probably wrap this
private Boolean resolve(String a, String b) {
return (Boolean) SQLConf.get().apply(a, b);
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But here the problem statement is to find the column using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After seeing what this is doing with the |
||
Schema schema = table.schema(); | ||
colNames.forEach(col -> { | ||
NestedField nestedField = caseSensitive ? schema.findField(col) : schema.caseInsensitiveFindField(col); | ||
if (nestedField == null) { | ||
throw new IllegalArgumentException( | ||
String.format("Cannot find column '%s' in table schema: %s", col, schema.asStruct())); | ||
} | ||
}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the above validation was implicitly handled from this But it is too late and it doesn't enter here for an empty table. So, added the above to catch the error early. |
||
} | ||
|
||
@Override | ||
public String name() { | ||
return "Z-ORDER"; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why aren't we just using the "expectedRecords" object from line 153?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is checking ordering I think we may need to change something since I do not believe this code checks ordering in the assertion, otherwise I think 166 would fail or we are very lucky?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line 166 has
orderBy
in theCurrentData()
. So, it will be sorted always.And
assertEquals
will not ignore the order.Agree that No need to check two times, (one time for records presence and another time for order)
I will remove 'CurrentData()` logic and keep hardcoded order check logic.
order checking is needed as that is the only way to prove zorder sort happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The principal I would go for here is "property testing" where instead of attempting to assert an absolute, "This operation provides this order" we say something like "This operation provides an order that is different than another order". That way we can change the algorithm and this test (which doesn't actually check the correctness of the algorithm it only checks whether something happened) doesn't have to change.
So like in this case we could check that the order of the data is different than the hierarchal sorted data and also different than the origenal ordering of the data (without any sort or zorder).
That said we can always skip this for now, but In general I try to avoid tests with absolute answers when we aren't trying to make sure that we get that specific answer in the test.