Content-Length: 683815 | pFad | http://github.com/apache/iceberg/commit/c0ccb003894f76075fdc02f0ec2a15ef1cf9b194

04 Spark-3.2: Support Zorder option for rewrite_data_files stored proced… · apache/iceberg@c0ccb00 · GitHub
Skip to content

Commit

Permalink
Spark-3.2: Support Zorder option for rewrite_data_files stored proced…
Browse files Browse the repository at this point in the history
…ure (#4902)


Co-authored-by: Ryan Blue <blue@apache.org>
  • Loading branch information
ajantha-bhat and rdblue authored Jul 8, 2022
1 parent d131c35 commit c0ccb00
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,19 @@
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical
package org.apache.iceberg.expressions;

import org.apache.iceberg.NullOrder
import org.apache.iceberg.Schema
import org.apache.iceberg.SortDirection
import org.apache.iceberg.SortOrder
import org.apache.iceberg.expressions.Term
import java.util.Arrays;
import java.util.List;

class SortOrderParserUtil {
public class Zorder implements Term {
private final NamedReference<?>[] refs;

def collectSortOrder(tableSchema:Schema, sortOrder: Seq[(Term, SortDirection, NullOrder)]): SortOrder = {
val orderBuilder = SortOrder.builderFor(tableSchema)
sortOrder.foreach {
case (term, SortDirection.ASC, nullOrder) =>
orderBuilder.asc(term, nullOrder)
case (term, SortDirection.DESC, nullOrder) =>
orderBuilder.desc(term, nullOrder)
}
orderBuilder.build();
public Zorder(List<NamedReference<?>> refs) {
this.refs = refs.toArray(new NamedReference[0]);
}

public List<NamedReference<?>> refs() {
return Arrays.asList(refs);
}
}
8 changes: 7 additions & 1 deletion docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ Iceberg can compact data files in parallel using Spark with the `rewriteDataFile
|---------------|-----------|------|-------------|
| `table` | ✔️ | string | Name of the table to update |
| `strategy` | | string | Name of the strategy - binpack or sort. Defaults to binpack strategy |
| `sort_order` | | string | Comma separated sort_order_column. Where sort_order_column is a space separated sort order info per column (ColumnName SortDirection NullOrder). <br/> SortDirection can be ASC or DESC. NullOrder can be NULLS FIRST or NULLS LAST |
| `sort_order` | | string | If Zorder, then comma separated column names within zorder() text. Example: zorder(c1,c2,c3). <br/>Else, Comma separated sort_order_column. Where sort_order_column is a space separated sort order info per column (ColumnName SortDirection NullOrder). <br/> SortDirection can be ASC or DESC. NullOrder can be NULLS FIRST or NULLS LAST |
| `options` || map<string, string> | Options to be used for actions|
| `where` || string | predicate as a string used for filtering the files. Note that all files that may contain data matching the filter will be selected for rewriting|

Expand Down Expand Up @@ -300,6 +300,12 @@ using the same defaults as bin-pack to determine which files to rewrite.
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST')
```

Rewrite the data files in table `db.sample` by zOrdering on column c1 and c2.
Using the same defaults as bin-pack to determine which files to rewrite.
```sql
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'zorder(c1,c2)')
```

Rewrite the data files in table `db.sample` using bin-pack strategy in any partition where more than 2 or more files need to be rewritten.
```sql
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', options => map('min-input-files','2'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ callArgument
| identifier '=>' expression #namedArgument
;

singleOrder
: order EOF
;

order
: fields+=orderField (',' fields+=orderField)*
| '(' fields+=orderField (',' fields+=orderField)* ')'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.antlr.v4.runtime.misc.Interval
import org.antlr.v4.runtime.misc.ParseCancellationException
import org.antlr.v4.runtime.tree.TerminalNodeImpl
import org.apache.iceberg.common.DynConstructors
import org.apache.iceberg.spark.ExtendedParser
import org.apache.iceberg.spark.ExtendedParser.RawOrderField
import org.apache.iceberg.spark.Spark3Util
import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -57,7 +59,7 @@ import org.apache.spark.sql.types.StructType
import scala.jdk.CollectionConverters._
import scala.util.Try

class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserInterface {
class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserInterface with ExtendedParser {

import IcebergSparkSqlExtensionsParser._

Expand Down Expand Up @@ -112,6 +114,14 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
delegate.parseTableSchema(sqlText)
}

override def parseSortOrder(sqlText: String): java.util.List[RawOrderField] = {
val fields = parse(sqlText) { parser => astBuilder.visitSingleOrder(parser.singleOrder()) }
fields.map { field =>
val (term, direction, order) = field
new RawOrderField(term, direction, order)
}.asJava
}

/**
* Parse a string to a LogicalPlan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.iceberg.DistributionMode
import org.apache.iceberg.NullOrder
import org.apache.iceberg.SortDirection
import org.apache.iceberg.SortOrder
import org.apache.iceberg.UnboundSortOrder
import org.apache.iceberg.expressions.Term
import org.apache.iceberg.spark.Spark3Util
import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -217,6 +219,10 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
toSeq(ctx.parts).map(_.getText)
}

override def visitSingleOrder(ctx: SingleOrderContext): Seq[(Term, SortDirection, NullOrder)] = withOrigin(ctx) {
toSeq(ctx.order.fields).map(typedVisit[(Term, SortDirection, NullOrder)])
}

/**
* Create a positional argument in a stored procedure call.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.NamedReference;
import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.ExtendedParser;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;


Expand All @@ -46,6 +50,14 @@ public void removeTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

@Test
public void testZOrderSortExpression() {
List<ExtendedParser.RawOrderField> order = ExtendedParser.parseSortOrder(spark, "c1, zorder(c2, c3)");
Assert.assertEquals("Should parse 2 order fields", 2, order.size());
Assert.assertEquals("First field should be a ref", "c1", ((NamedReference<?>) order.get(0).term()).name());
Assert.assertTrue("Second field should be zorder", order.get(1).term() instanceof Zorder);
}

@Test
public void testRewriteDataFilesInEmptyTable() {
createTable();
Expand Down Expand Up @@ -133,6 +145,39 @@ public void testRewriteDataFilesWithSortStrategy() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithZOrder() {
createTable();
// create 10 files under non-partitioned table
insertData(10);

// set z_order = c1,c2
List<Object[]> output = sql(
"CALL %s.system.rewrite_data_files(table => '%s', " +
"strategy => 'sort', sort_order => 'zorder(c1,c2)')",
catalogName, tableIdent);

assertEquals("Action should rewrite 10 data files and add 1 data files",
ImmutableList.of(row(10, 1)),
output);

// Due to Z_order, the data written will be in the below order.
// As there is only one small output file, we can validate the query ordering (as it will not change).
ImmutableList<Object[]> expectedRows = ImmutableList.of(
row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null),
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null)
);
assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
}

@Test
public void testRewriteDataFilesWithFilter() {
createTable();
Expand Down Expand Up @@ -258,7 +303,7 @@ public void testRewriteDataFilesWithInvalidInputs() {

// Test for invalid strategy
AssertHelpers.assertThrows("Should reject calls with unsupported strategy error message",
IllegalArgumentException.class, "unsupported strategy: temp. Only binpack,sort is supported",
IllegalArgumentException.class, "unsupported strategy: temp. Only binpack or sort is supported",
() -> sql("CALL %s.system.rewrite_data_files(table => '%s', options => map('min-input-files','2'), " +
"strategy => 'temp')", catalogName, tableIdent));

Expand Down Expand Up @@ -292,6 +337,20 @@ public void testRewriteDataFilesWithInvalidInputs() {
IllegalArgumentException.class, "Cannot parse predicates in where option: col1 = 3",
() -> sql("CALL %s.system.rewrite_data_files(table => '%s', " +
"where => 'col1 = 3')", catalogName, tableIdent));

// Test for z_order with invalid column name
AssertHelpers.assertThrows("Should reject calls with error message",
IllegalArgumentException.class, "Cannot find column 'col1' in table schema: " +
"struct<1: c1: optional int, 2: c2: optional string, 3: c3: optional string>",
() -> sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " +
"sort_order => 'zorder(col1)')", catalogName, tableIdent));

// Test for z_order with sort_order
AssertHelpers.assertThrows("Should reject calls with error message",
IllegalArgumentException.class, "Cannot mix identity sort columns and a Zorder sort expression:" +
" c1,zorder(c2,c3)",
() -> sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " +
"sort_order => 'c1,zorder(c2,c3)')", catalogName, tableIdent));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark;

import java.util.List;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.expressions.Term;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.parser.ParserInterface;

public interface ExtendedParser extends ParserInterface {
class RawOrderField {
private final Term term;
private final SortDirection direction;
private final NullOrder nullOrder;

public RawOrderField(Term term, SortDirection direction, NullOrder nullOrder) {
this.term = term;
this.direction = direction;
this.nullOrder = nullOrder;
}

public Term term() {
return term;
}

public SortDirection direction() {
return direction;
}

public NullOrder nullOrder() {
return nullOrder;
}
}

static List<RawOrderField> parseSortOrder(SparkSession spark, String orderString) {
if (spark.sessionState().sqlParser() instanceof ExtendedParser) {
ExtendedParser parser = (ExtendedParser) spark.sessionState().sqlParser();
try {
return parser.parseSortOrder(orderString);
} catch (AnalysisException e) {
throw new IllegalArgumentException(String.format("Unable to parse sortOrder: %s", orderString), e);
}
} else {
throw new IllegalStateException("Cannot parse order: parser is not an Iceberg ExtendedParser");
}
}

List<RawOrderField> parseSortOrder(String orderString) throws AnalysisException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.NullOrder;
Expand All @@ -42,6 +43,7 @@
import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -312,7 +314,7 @@ public static NamedReference toNamedReference(String name) {
public static Term toIcebergTerm(Expression expr) {
if (expr instanceof Transform) {
Transform transform = (Transform) expr;
Preconditions.checkArgument(transform.references().length == 1,
Preconditions.checkArgument("zorder".equals(transform.name()) || transform.references().length == 1,
"Cannot convert transform with more than one column reference: %s", transform);
String colName = DOT.join(transform.references()[0].fieldNames());
switch (transform.name()) {
Expand All @@ -332,6 +334,11 @@ public static Term toIcebergTerm(Expression expr) {
return org.apache.iceberg.expressions.Expressions.hour(colName);
case "truncate":
return org.apache.iceberg.expressions.Expressions.truncate(colName, findWidth(transform));
case "zorder":
return new Zorder(Stream.of(transform.references())
.map(ref -> DOT.join(ref.fieldNames()))
.map(org.apache.iceberg.expressions.Expressions::ref)
.collect(Collectors.toList()));
default:
throw new UnsupportedOperationException("Transform is not supported: " + transform);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public RewriteDataFiles sort() {

@Override
public RewriteDataFiles zOrder(String... columnNames) {
Preconditions.checkArgument(this.strategy == null,
"Cannot set strategy to zorder, it has already been set to %s", this.strategy);
this.strategy = zOrderStrategy(columnNames);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,23 @@ public SparkZOrderStrategy(Table table, SparkSession spark, List<String> zOrderC
"Cannot perform ZOrdering, all columns provided were identity partition columns and cannot be used.");
}

validateColumnsExistence(table, spark, zOrderColNames);

this.zOrderColNames = zOrderColNames;
}

private void validateColumnsExistence(Table table, SparkSession spark, List<String> colNames) {
boolean caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
Schema schema = table.schema();
colNames.forEach(col -> {
NestedField nestedField = caseSensitive ? schema.findField(col) : schema.caseInsensitiveFindField(col);
if (nestedField == null) {
throw new IllegalArgumentException(
String.format("Cannot find column '%s' in table schema: %s", col, schema.asStruct()));
}
});
}

@Override
public String name() {
return "Z-ORDER";
Expand Down
Loading

0 comments on commit c0ccb00

Please sign in to comment.








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/iceberg/commit/c0ccb003894f76075fdc02f0ec2a15ef1cf9b194

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy