Content-Length: 436448 | pFad | http://github.com/apache/iceberg/commit/7986c283eed15523e575eb9c1ca27be488d1a004

B4 New review comment fixes · apache/iceberg@7986c28 · GitHub
Skip to content

Commit

Permalink
New review comment fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Jun 27, 2022
1 parent a5f87a6 commit 7986c28
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 35 deletions.
20 changes: 13 additions & 7 deletions docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,13 @@ Iceberg can compact data files in parallel using Spark with the `rewriteDataFile

#### Usage

| Argument Name | Required? | Type | Description |
|---------------|-----------|------|-------------|
| `table` | ✔️ | string | Name of the table to update |
| `strategy` | | string | Name of the strategy - binpack, sort or zorder. Defaults to binpack strategy |
| `sort_order` | | string | Comma separated sort_order_column. Where sort_order_column is a space separated sort order info per column (ColumnName SortDirection NullOrder). <br/> SortDirection can be ASC or DESC. NullOrder can be NULLS FIRST or NULLS LAST |
| `options` || map<string, string> | Options to be used for actions|
| `where` || string | predicate as a string used for filtering the files. Note that all files that may contain data matching the filter will be selected for rewriting|
| Argument Name | Required? | Type | Description |
|---------------|-----------|------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `table` | ✔️ | string | Name of the table to update |
| `strategy` | | string | Name of the strategy - binpack or sort. Defaults to binpack strategy |
| `sort_order` | | string | If Zorder, then comma separated column names within zorder() text. Example: zorder(c1,c2,c3). <br/>Else, comma separated sort_order_column. Where sort_order_column is a space separated sort order info per column (ColumnName SortDirection NullOrder). <br/> SortDirection can be ASC or DESC. NullOrder can be NULLS FIRST or NULLS LAST |
| `options` || map<string, string> | Options to be used for actions |
| `where` || string | predicate as a string used for filtering the files. Note that all files that may contain data matching the filter will be selected for rewriting |


See the [`RewriteDataFiles` Javadoc](../../../javadoc/{{% icebergVersion %}}/org/apache/iceberg/actions/RewriteDataFiles.html#field.summary),
Expand Down Expand Up @@ -300,6 +300,12 @@ using the same defaults as bin-pack to determine which files to rewrite.
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST')
```

Rewrite the data files in table `db.sample` by zOrdering on column c1 and c2.
Using the same defaults as bin-pack to determine which files to rewrite.
```sql
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'zorder(c1,c2)')
```

Rewrite the data files in table `db.sample` using bin-pack strategy in any partition where more than 2 or more files need to be rewritten.
```sql
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', options => map('min-input-files','2'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void testRewriteDataFilesWithZOrder() {
// set z_order = c1,c2
List<Object[]> output = sql(
"CALL %s.system.rewrite_data_files(table => '%s', " +
"strategy => 'zorder', sort_order => 'c1,c2')",
"strategy => 'sort', sort_order => 'zorder(c1,c2)')",
catalogName, tableIdent);

assertEquals("Action should rewrite 10 data files and add 1 data files",
Expand Down Expand Up @@ -295,7 +295,7 @@ public void testRewriteDataFilesWithInvalidInputs() {

// Test for invalid strategy
AssertHelpers.assertThrows("Should reject calls with unsupported strategy error message",
IllegalArgumentException.class, "unsupported strategy: temp. Only binpack,sort or zorder is supported",
IllegalArgumentException.class, "unsupported strategy: temp. Only binpack or sort is supported",
() -> sql("CALL %s.system.rewrite_data_files(table => '%s', options => map('min-input-files','2'), " +
"strategy => 'temp')", catalogName, tableIdent));

Expand Down Expand Up @@ -332,10 +332,10 @@ public void testRewriteDataFilesWithInvalidInputs() {

// Test for z_order with invalid column name
AssertHelpers.assertThrows("Should reject calls with error message",
ValidationException.class, "Cannot find field 'col1' in struct:" +
" struct<1: c1: optional int, 2: c2: optional string, 3: c3: optional string>",
() -> sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'zorder', " +
"sort_order => 'col1')", catalogName, tableIdent));
IllegalArgumentException.class, "Cannot find field 'col1' in which is specified in the ZOrder " +
"in the table schema: struct<1: c1: optional int, 2: c2: optional string, 3: c3: optional string>",
() -> sql("CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " +
"sort_order => 'zorder(col1)')", catalogName, tableIdent));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public RewriteDataFiles sort() {
@Override
public RewriteDataFiles zOrder(String... columnNames) {
Preconditions.checkArgument(this.strategy == null,
"Cannot set strategy to zOrder, it has already been set to %s", this.strategy);
"Cannot set strategy to zOrder, it has already been set to %s", this.strategy);
this.strategy = zOrderStrategy(columnNames);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ private void validateColumnsExistence(Table table, SparkSession spark, List<Stri
NestedField nestedField = caseSensitive ? schema.findField(col) : schema.caseInsensitiveFindField(col);
if (nestedField == null) {
throw new IllegalArgumentException(
String.format("Cannot find field '%s' in struct: %s", col, schema.asStruct()));
String.format("Cannot find field '%s' in which is specified in the ZOrder " +
"in the table schema: %s", col, schema.asStruct()));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

package org.apache.iceberg.spark.procedures;

import java.util.Arrays;
import java.util.Map;
import org.apache.iceberg.Schema;
import java.util.regex.Pattern;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFiles;
Expand Down Expand Up @@ -97,12 +98,9 @@ public InternalRow[] call(InternalRow args) {

String strategy = args.isNullAt(1) ? null : args.getString(1);
String sortOrderString = args.isNullAt(2) ? null : args.getString(2);
SortOrder sortOrder = null;
if (sortOrderString != null) {
sortOrder = collectSortOrders(table, sortOrderString);
}
if (strategy != null || sortOrder != null) {
action = checkAndApplyStrategy(action, strategy, sortOrder);

if (strategy != null || sortOrderString != null) {
action = checkAndApplyStrategy(action, strategy, sortOrderString, table);
}

if (!args.isNullAt(3)) {
Expand Down Expand Up @@ -141,32 +139,37 @@ private RewriteDataFiles checkAndApplyOptions(InternalRow args, RewriteDataFiles
return action.options(options);
}

private RewriteDataFiles checkAndApplyStrategy(RewriteDataFiles action, String strategy, SortOrder sortOrder) {
private RewriteDataFiles checkAndApplyStrategy(
RewriteDataFiles action,
String strategy,
String sortOrderString,
Table table) {
Pattern zOrderPattern = Pattern.compile("zorder\\s*\\(.*\\)", Pattern.CASE_INSENSITIVE);
boolean isZOrder = sortOrderString != null && zOrderPattern.matcher(sortOrderString).matches();
// caller of this function ensures that between strategy and sortOrder, at least one of them is not null.
if (strategy == null || strategy.equalsIgnoreCase("sort") || strategy.equalsIgnoreCase("zorder")) {
if (strategy != null && strategy.equalsIgnoreCase("zorder")) {
return action.zOrder(columnNames(sortOrder));
if (strategy == null || strategy.equalsIgnoreCase("sort")) {
if (isZOrder) {
String columns = sortOrderString.substring(
sortOrderString.indexOf("(") + 1,
sortOrderString.lastIndexOf(")"));
String[] columnNames = Arrays.stream(columns.split(",")).map(String::trim).toArray(String[]::new);
return action.zOrder(columnNames);
}
return action.sort(sortOrder);
return action.sort(collectSortOrders(table, sortOrderString));
}
if (strategy.equalsIgnoreCase("binpack")) {
RewriteDataFiles rewriteDataFiles = action.binPack();
if (sortOrder != null) {
if (sortOrderString != null) {
// calling below method to throw the error as user has set both binpack strategy and sort order
return rewriteDataFiles.sort(sortOrder);
return rewriteDataFiles.sort(collectSortOrders(table, sortOrderString));
}
return rewriteDataFiles;
} else {
throw new IllegalArgumentException(
"unsupported strategy: " + strategy + ". Only binpack,sort or zorder is supported");
"unsupported strategy: " + strategy + ". Only binpack or sort is supported");
}
}

private String[] columnNames(SortOrder sortOrder) {
Schema schema = sortOrder.schema();
return sortOrder.fields().stream().map(field -> schema.findColumnName(field.sourceId())).toArray(String[]::new);
}

private SortOrder collectSortOrders(Table table, String sortOrderStr) {
String prefix = "ALTER TABLE temp WRITE ORDERED BY ";
try {
Expand Down

0 comments on commit 7986c28

Please sign in to comment.








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/iceberg/commit/7986c283eed15523e575eb9c1ca27be488d1a004

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy