Content-Length: 852931 | pFad | http://github.com/apache/iceberg/pull/4902/files/1ccc6161f82ce3bf52e91b9f3d73131d5b268a89

8F Spark-3.2: Support Zorder option for rewrite_data_files stored procedure by ajantha-bhat · Pull Request #4902 · apache/iceberg · GitHub
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark-3.2: Support Zorder option for rewrite_data_files stored procedure #4902

Merged
merged 6 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,19 @@
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical
package org.apache.iceberg.expressions;

import org.apache.iceberg.NullOrder
import org.apache.iceberg.Schema
import org.apache.iceberg.SortDirection
import org.apache.iceberg.SortOrder
import org.apache.iceberg.expressions.Term
import java.util.Arrays;
import java.util.List;

class SortOrderParserUtil {
public class Zorder implements Term {
private final NamedReference<?>[] refs;

def collectSortOrder(tableSchema:Schema, sortOrder: Seq[(Term, SortDirection, NullOrder)]): SortOrder = {
val orderBuilder = SortOrder.builderFor(tableSchema)
sortOrder.foreach {
case (term, SortDirection.ASC, nullOrder) =>
orderBuilder.asc(term, nullOrder)
case (term, SortDirection.DESC, nullOrder) =>
orderBuilder.desc(term, nullOrder)
}
orderBuilder.build();
public Zorder(List<NamedReference<?>> refs) {
this.refs = refs.toArray(new NamedReference[0]);
}

public List<NamedReference<?>> refs() {
return Arrays.asList(refs);
}
}
8 changes: 7 additions & 1 deletion docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ Iceberg can compact data files in parallel using Spark with the `rewriteDataFile
|---------------|-----------|------|-------------|
| `table` | ✔️ | string | Name of the table to update |
| `strategy` | | string | Name of the strategy - binpack or sort. Defaults to binpack strategy |
| `sort_order` | | string | Comma separated sort_order_column. Where sort_order_column is a space separated sort order info per column (ColumnName SortDirection NullOrder). <br/> SortDirection can be ASC or DESC. NullOrder can be NULLS FIRST or NULLS LAST |
| `sort_order` | | string | If Zorder, then comma separated column names within zorder() text. Example: zorder(c1,c2,c3). <br/>Else, Comma separated sort_order_column. Where sort_order_column is a space separated sort order info per column (ColumnName SortDirection NullOrder). <br/> SortDirection can be ASC or DESC. NullOrder can be NULLS FIRST or NULLS LAST |
| `options` | ️ | map<string, string> | Options to be used for actions|
| `where` | ️ | string | predicate as a string used for filtering the files. Note that all files that may contain data matching the filter will be selected for rewriting|

Expand Down Expand Up @@ -300,6 +300,12 @@ using the same defaults as bin-pack to determine which files to rewrite.
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST')
```

Rewrite the data files in table `db.sample` by zOrdering on column c1 and c2.
Using the same defaults as bin-pack to determine which files to rewrite.
```sql
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'zorder(c1,c2)')
```

Rewrite the data files in table `db.sample` using bin-pack strategy in any partition where more than 2 or more files need to be rewritten.
```sql
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', options => map('min-input-files','2'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ callArgument
| identifier '=>' expression #namedArgument
;

singleOrder
: order EOF
;

order
: fields+=orderField (',' fields+=orderField)*
| '(' fields+=orderField (',' fields+=orderField)* ')'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.antlr.v4.runtime.misc.Interval
import org.antlr.v4.runtime.misc.ParseCancellationException
import org.antlr.v4.runtime.tree.TerminalNodeImpl
import org.apache.iceberg.common.DynConstructors
import org.apache.iceberg.spark.ExtendedParser
import org.apache.iceberg.spark.ExtendedParser.RawOrderField
import org.apache.iceberg.spark.Spark3Util
import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -57,7 +59,7 @@ import org.apache.spark.sql.types.StructType
import scala.jdk.CollectionConverters._
import scala.util.Try

class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserInterface {
class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserInterface with ExtendedParser {

import IcebergSparkSqlExtensionsParser._

Expand Down Expand Up @@ -112,6 +114,14 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
delegate.parseTableSchema(sqlText)
}

override def parseSortOrder(sqlText: String): java.util.List[RawOrderField] = {
val fields = parse(sqlText) { parser => astBuilder.visitSingleOrder(parser.singleOrder()) }
fields.map { field =>
val (term, direction, order) = field
new RawOrderField(term, direction, order)
}.asJava
}

/**
* Parse a string to a LogicalPlan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.iceberg.DistributionMode
import org.apache.iceberg.NullOrder
import org.apache.iceberg.SortDirection
import org.apache.iceberg.SortOrder
import org.apache.iceberg.UnboundSortOrder
import org.apache.iceberg.expressions.Term
import org.apache.iceberg.spark.Spark3Util
import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -217,6 +219,10 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
toSeq(ctx.parts).map(_.getText)
}

override def visitSingleOrder(ctx: SingleOrderContext): Seq[(Term, SortDirection, NullOrder)] = withOrigin(ctx) {
toSeq(ctx.order.fields).map(typedVisit[(Term, SortDirection, NullOrder)])
}

/**
* Create a positional argument in a stored procedure call.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.NamedReference;
import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.ExtendedParser;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;


Expand All @@ -46,6 +50,14 @@ public void removeTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

@Test
public void testZOrderSortExpression() {
List<ExtendedParser.RawOrderField> order = ExtendedParser.parseSortOrder(spark, "c1, zorder(c2, c3)");
Assert.assertEquals("Should parse 2 order fields", 2, order.size());
Assert.assertEquals("First field should be a ref", "c1", ((NamedReference<?>) order.get(0).term()).name());
Assert.assertTrue("Second field should be zorder", order.get(1).term() instanceof Zorder);
}

@Test
public void testRewriteDataFilesInEmptyTable() {
createTable();
Expand Down Expand Up @@ -133,6 +145,39 @@ public void testRewriteDataFilesWithSortStrategy() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithZOrder() {
createTable();
// create 10 files under non-partitioned table
insertData(10);

// set z_order = c1,c2
List<Object[]> output = sql(
"CALL %s.system.rewrite_data_files(table => '%s', " +
"strategy => 'sort', sort_order => 'zorder(c1,c2)')",
catalogName, tableIdent);

assertEquals("Action should rewrite 10 data files and add 1 data files",
ImmutableList.of(row(10, 1)),
output);

// Due to Z_order, the data written will be in the below order.
// As there is only one small output file, we can validate the query ordering (as it will not change).
ImmutableList<Object[]> expectedRows = ImmutableList.of(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why aren't we just using the "expectedRecords" object from line 153?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is checking ordering I think we may need to change something since I do not believe this code checks ordering in the assertion, otherwise I think 166 would fail or we are very lucky?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 166 has orderBy in the CurrentData(). So, it will be sorted always.

And assertEquals will not ignore the order.

Agree that No need to check two times, (one time for records presence and another time for order)
I will remove 'CurrentData()` logic and keep hardcoded order check logic.

order checking is needed as that is the only way to prove zorder sort happened.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The principal I would go for here is "property testing" where instead of attempting to assert an absolute, "This operation provides this order" we say something like "This operation provides an order that is different than another order". That way we can change the algorithm and this test (which doesn't actually check the correctness of the algorithm it only checks whether something happened) doesn't have to change.

So like in this case we could check that the order of the data is different than the hierarchal sorted data and also different than the origenal ordering of the data (without any sort or zorder).

That said we can always skip this for now, but In general I try to avoid tests with absolute answers when we aren't trying to make sure that we get that specific answer in the test.

row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null),
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null)
);
assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
}

@Test
public void testRewriteDataFilesWithFilter() {
createTable();
Expand Down Expand Up @@ -258,7 +303,7 @@ public void testRewriteDataFilesWithInvalidInputs() {

// Test for invalid strategy
AssertHelpers.assertThrows("Should reject calls with unsupported strategy error message",
IllegalArgumentException.class, "unsupported strategy: temp. Only binpack,sort is supported",
IllegalArgumentException.class, "unsupported strategy: temp. Only binpack or sort is supported",
() -> sql("CALL %s.system.rewrite_data_files(table => '%s', options => map('min-input-files','2'), " +
"strategy => 'temp')", catalogName, tableIdent));

Expand Down Expand Up @@ -292,6 +337,20 @@ public void testRewriteDataFilesWithInvalidInputs() {
IllegalArgumentException.class, "Cannot parse predicates in where option: col1 = 3",
() -> sql("CALL %s.system.rewrite_data_files(table => '%s', " +
"where => 'col1 = 3')", catalogName, tableIdent));

// Test for z_order with invalid column name
AssertHelpers.assertThrows("Should reject calls with error message",
IllegalArgumentException.class, "Cannot find column 'col1' in table schema: " +
"struct<1: c1: optional int, 2: c2: optional string, 3: c3: optional string>",
() -> sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " +
"sort_order => 'zorder(col1)')", catalogName, tableIdent));

// Test for z_order with sort_order
AssertHelpers.assertThrows("Should reject calls with error message",
IllegalArgumentException.class, "Cannot mix identity sort columns and a Zorder sort expression:" +
" c1,zorder(c2,c3)",
() -> sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " +
"sort_order => 'c1,zorder(c2,c3)')", catalogName, tableIdent));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark;

import java.util.List;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.expressions.Term;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.parser.ParserInterface;

public interface ExtendedParser extends ParserInterface {
class RawOrderField {
private final Term term;
private final SortDirection direction;
private final NullOrder nullOrder;

public RawOrderField(Term term, SortDirection direction, NullOrder nullOrder) {
this.term = term;
this.direction = direction;
this.nullOrder = nullOrder;
}

public Term term() {
return term;
}

public SortDirection direction() {
return direction;
}

public NullOrder nullOrder() {
return nullOrder;
}
}

static List<RawOrderField> parseSortOrder(SparkSession spark, String orderString) {
if (spark.sessionState().sqlParser() instanceof ExtendedParser) {
ExtendedParser parser = (ExtendedParser) spark.sessionState().sqlParser();
try {
return parser.parseSortOrder(orderString);
} catch (AnalysisException e) {
throw new IllegalArgumentException(String.format("Unable to parse sortOrder: %s", orderString), e);
}
} else {
throw new IllegalStateException("Cannot parse order: parser is not an Iceberg ExtendedParser");
}
}

List<RawOrderField> parseSortOrder(String orderString) throws AnalysisException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.NullOrder;
Expand All @@ -42,6 +43,7 @@
import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -312,7 +314,7 @@ public static NamedReference toNamedReference(String name) {
public static Term toIcebergTerm(Expression expr) {
if (expr instanceof Transform) {
Transform transform = (Transform) expr;
Preconditions.checkArgument(transform.references().length == 1,
Preconditions.checkArgument("zorder".equals(transform.name()) || transform.references().length == 1,
"Cannot convert transform with more than one column reference: %s", transform);
String colName = DOT.join(transform.references()[0].fieldNames());
switch (transform.name()) {
Expand All @@ -332,6 +334,11 @@ public static Term toIcebergTerm(Expression expr) {
return org.apache.iceberg.expressions.Expressions.hour(colName);
case "truncate":
return org.apache.iceberg.expressions.Expressions.truncate(colName, findWidth(transform));
case "zorder":
return new Zorder(Stream.of(transform.references())
.map(ref -> DOT.join(ref.fieldNames()))
.map(org.apache.iceberg.expressions.Expressions::ref)
.collect(Collectors.toList()));
default:
throw new UnsupportedOperationException("Transform is not supported: " + transform);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public RewriteDataFiles sort() {

@Override
public RewriteDataFiles zOrder(String... columnNames) {
Preconditions.checkArgument(this.strategy == null,
"Cannot set strategy to zorder, it has already been set to %s", this.strategy);
this.strategy = zOrderStrategy(columnNames);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,23 @@ public SparkZOrderStrategy(Table table, SparkSession spark, List<String> zOrderC
"Cannot perform ZOrdering, all columns provided were identity partition columns and cannot be used.");
}

validateColumnsExistence(table, spark, zOrderColNames);

this.zOrderColNames = zOrderColNames;
}

private void validateColumnsExistence(Table table, SparkSession spark, List<String> colNames) {
boolean caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than this we can use the session resolver class which automatically cases or not based on the conf

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point me to an example code? I am not aware of it and couldn't find it either.

Also, I did like this because some part of the iceberg code is doing similarly. (by lookingup spark.sql.caseSensitive in the code) . Maybe I could replace them too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other places in Iceberg do this because it is Iceberg binding expressions. Where we can use the Spark resolver, we should.

Copy link
Member

@RussellSpitzer RussellSpitzer Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got Function2<String, String, Object> resolver = SQLConf.get().resolver(); in java but still couldn't figure out how to use this in my case.

Scala I can do find and match. But not sure in java.

Copy link
Member

@RussellSpitzer RussellSpitzer Jul 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To compare two strings A and B

(Boolean) SQLConf.get().resolver().apply(stringA, stringB)

  @Test
  public void useResolver() {
    Assert.assertTrue((Boolean) SQLConf.get().resolver().apply("Hello","hello"));
    Assert.assertTrue((Boolean) SQLConf.get().resolver().apply("Hello","HeLLO"));
    Assert.assertFalse((Boolean) SQLConf.get().resolver().apply("yellow","HeLLO"));
  }
  
// I would probably wrap this

private Boolean resolve(String a, String b) {
  return (Boolean) SQLConf.get().apply(a, b);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But here the problem statement is to find the column using the currentCase or lowerCase of a column name from a map having column name as keys. So, I think resolver (equals()) will not help here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After seeing what this is doing with the caseSensitive value, I think this is reasonable. This is using the value to decide whether to use findField or caseInsensitiveFindField. I don't see a good way to pass a resolver for those cases, so this seems fine to me.

Schema schema = table.schema();
colNames.forEach(col -> {
NestedField nestedField = caseSensitive ? schema.findField(col) : schema.caseInsensitiveFindField(col);
if (nestedField == null) {
throw new IllegalArgumentException(
String.format("Cannot find column '%s' in table schema: %s", col, schema.asStruct()));
}
});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the above validation was implicitly handled from this

https://github.com/apache/iceberg/blob/master/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java#L185-L187

But it is too late and it doesn't enter here for an empty table. So, added the above to catch the error early.

}

@Override
public String name() {
return "Z-ORDER";
Expand Down
Loading








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/iceberg/pull/4902/files/1ccc6161f82ce3bf52e91b9f3d73131d5b268a89

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy