Content-Length: 722902 | pFad | http://github.com/apache/iceberg/commit/08264343d3c9bc33490dd2f42ab3fc2a69ee796b

3C Spark 3.3: support branch write using table identifier · apache/iceberg@0826434 · GitHub
Skip to content

Commit

Permalink
Spark 3.3: support branch write using table identifier
Browse files Browse the repository at this point in the history
  • Loading branch information
Jack Ye committed Mar 8, 2023
1 parent 88ccf9a commit 0826434
Show file tree
Hide file tree
Showing 51 changed files with 1,337 additions and 624 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,8 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
// filter any existing manifests
List<ManifestFile> filtered =
filterManager.filterManifests(
base.schema(), snapshot != null ? snapshot.dataManifests(ops.io()) : null);
SnapshotUtil.schemaFor(base, targetBranch()),
snapshot != null ? snapshot.dataManifests(ops.io()) : null);
long minDataSequenceNumber =
filtered.stream()
.map(ManifestFile::minSequenceNumber)
Expand All @@ -989,7 +990,8 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
List<ManifestFile> filteredDeletes =
deleteFilterManager.filterManifests(
base.schema(), snapshot != null ? snapshot.deleteManifests(ops.io()) : null);
SnapshotUtil.schemaFor(base, targetBranch()),
snapshot != null ? snapshot.deleteManifests(ops.io()) : null);

// only keep manifests that have live data files or that were written by this commit
Predicate<ManifestFile> shouldKeep =
Expand Down
15 changes: 6 additions & 9 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Exceptions;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
Expand Down Expand Up @@ -149,6 +150,10 @@ protected void targetBranch(String branch) {
this.targetBranch = branch;
}

protected String targetBranch() {
return targetBranch;
}

protected ExecutorService workerPool() {
return this.workerPool;
}
Expand Down Expand Up @@ -202,15 +207,7 @@ protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {}
@Override
public Snapshot apply() {
refresh();
Snapshot parentSnapshot = base.currentSnapshot();
if (targetBranch != null) {
SnapshotRef branch = base.ref(targetBranch);
if (branch != null) {
parentSnapshot = base.snapshot(branch.snapshotId());
} else if (base.currentSnapshot() != null) {
parentSnapshot = base.currentSnapshot();
}
}
Snapshot parentSnapshot = SnapshotUtil.latestSnapshot(base, targetBranch);

long sequenceNumber = base.nextSequenceNumber();
Long parentSnapshotId = parentSnapshot == null ? null : parentSnapshot.snapshotId();
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,18 @@ public ThisT useSnapshot(long scanSnapshotId) {
}

public ThisT useRef(String name) {
if (SnapshotRef.MAIN_BRANCH.equals(name)) {
return newRefinedScan(table(), tableSchema(), context());
}

Preconditions.checkArgument(
snapshotId() == null, "Cannot override ref, already set snapshot id=%s", snapshotId());
Snapshot snapshot = table().snapshot(name);
Preconditions.checkArgument(snapshot != null, "Cannot find ref %s", name);
return newRefinedScan(table(), tableSchema(), context().useSnapshotId(snapshot.snapshotId()));
return newRefinedScan(
table(),
SnapshotUtil.schemaFor(table(), name),
context().useSnapshotId(snapshot.snapshotId()));
}

public ThisT asOfTime(long timestampMillis) {
Expand Down
62 changes: 58 additions & 4 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,52 @@ public static Schema schemaFor(Table table, Long snapshotId, Long timestampMilli
return table.schema();
}

/**
* Return the schema of the snapshot at a given branch.
*
* <p>If branch does not exist, the table schema is returned because it will be the schema when
* the new branch is created.
*
* @param table a {@link Table}
* @param branch branch name of the table (nullable)
* @return schema of the specific snapshot at the given branch
*/
public static Schema schemaFor(Table table, String branch) {
if (branch == null || branch.equals(SnapshotRef.MAIN_BRANCH)) {
return table.schema();
}

Snapshot ref = table.snapshot(branch);
if (ref == null) {
return table.schema();
}

return schemaFor(table, ref.snapshotId());
}

/**
* Return the schema of the snapshot at a given branch.
*
* <p>If branch does not exist, the table schema is returned because it will be the schema when
* the new branch is created.
*
* @param metadata a {@link TableMetadata}
* @param branch branch name of the table (nullable)
* @return schema of the specific snapshot at the given branch
*/
public static Schema schemaFor(TableMetadata metadata, String branch) {
if (branch == null || branch.equals(SnapshotRef.MAIN_BRANCH)) {
return metadata.schema();
}

SnapshotRef ref = metadata.ref(branch);
if (ref == null) {
return metadata.schema();
}

return metadata.schemas().get(metadata.snapshot(ref.snapshotId()).schemaId());
}

/**
* Fetch the snapshot at the head of the given branch in the given table.
*
Expand All @@ -405,11 +451,11 @@ public static Schema schemaFor(Table table, Long snapshotId, Long timestampMilli
* code path to ensure backwards compatibility.
*
* @param table a {@link Table}
* @param branch branch name of the table
* @param branch branch name of the table (nullable)
* @return the latest snapshot for the given branch
*/
public static Snapshot latestSnapshot(Table table, String branch) {
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
if (branch == null || branch.equals(SnapshotRef.MAIN_BRANCH)) {
return table.currentSnapshot();
}

Expand All @@ -423,12 +469,20 @@ public static Snapshot latestSnapshot(Table table, String branch) {
* TableMetadata#ref(String)}} for the main branch so that existing code still goes through the
* old code path to ensure backwards compatibility.
*
* <p>If branch does not exist, the table's latest snapshot is returned it will be the schema when
* the new branch is created.
*
* @param metadata a {@link TableMetadata}
* @param branch branch name of the table metadata
* @param branch branch name of the table metadata (nullable)
* @return the latest snapshot for the given branch
*/
public static Snapshot latestSnapshot(TableMetadata metadata, String branch) {
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
if (branch == null || branch.equals(SnapshotRef.MAIN_BRANCH)) {
return metadata.currentSnapshot();
}

SnapshotRef ref = metadata.ref(branch);
if (ref == null) {
return metadata.currentSnapshot();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
Expand Down Expand Up @@ -70,24 +71,27 @@ public abstract class SparkRowLevelOperationsTestBase extends SparkExtensionsTes
protected final String fileFormat;
protected final boolean vectorized;
protected final String distributionMode;
protected final String branch;

public SparkRowLevelOperationsTestBase(
String catalogName,
String implementation,
Map<String, String> config,
String fileFormat,
boolean vectorized,
String distributionMode) {
String distributionMode,
String branch) {
super(catalogName, implementation, config);
this.fileFormat = fileFormat;
this.vectorized = vectorized;
this.distributionMode = distributionMode;
this.branch = branch;
}

@Parameters(
name =
"catalogName = {0}, implementation = {1}, config = {2},"
+ " format = {3}, vectorized = {4}, distributionMode = {5}")
+ " format = {3}, vectorized = {4}, distributionMode = {5}, branch = {6}")
public static Object[][] parameters() {
return new Object[][] {
{
Expand All @@ -98,7 +102,8 @@ public static Object[][] parameters() {
"default-namespace", "default"),
"orc",
true,
WRITE_DISTRIBUTION_MODE_NONE
WRITE_DISTRIBUTION_MODE_NONE,
SnapshotRef.MAIN_BRANCH
},
{
"testhive",
Expand All @@ -108,15 +113,17 @@ public static Object[][] parameters() {
"default-namespace", "default"),
"parquet",
true,
WRITE_DISTRIBUTION_MODE_NONE
WRITE_DISTRIBUTION_MODE_NONE,
null,
},
{
"testhadoop",
SparkCatalog.class.getName(),
ImmutableMap.of("type", "hadoop"),
"parquet",
RANDOM.nextBoolean(),
WRITE_DISTRIBUTION_MODE_HASH
WRITE_DISTRIBUTION_MODE_HASH,
null
},
{
"spark_catalog",
Expand All @@ -131,7 +138,8 @@ public static Object[][] parameters() {
),
"avro",
false,
WRITE_DISTRIBUTION_MODE_RANGE
WRITE_DISTRIBUTION_MODE_RANGE,
"test"
}
};
}
Expand Down Expand Up @@ -181,6 +189,7 @@ protected void createAndInitTable(String schema, String partitioning, String jso
try {
Dataset<Row> ds = toDS(schema, jsonData);
ds.coalesce(1).writeTo(tableName).append();
createBranch();
} catch (NoSuchTableException e) {
throw new RuntimeException("Failed to write data", e);
}
Expand Down Expand Up @@ -315,4 +324,20 @@ protected DataFile writeDataFile(Table table, List<GenericRecord> records) {
throw new UncheckedIOException(e);
}
}

@Override
protected String commitTarget() {
return branch == null ? tableName : String.format("%s.branch_%s", tableName, branch);
}

@Override
protected String selectTarget() {
return branch == null ? tableName : String.format("%s VERSION AS OF '%s'", tableName, branch);
}

protected void createBranch() {
if (branch != null && !branch.equals(SnapshotRef.MAIN_BRANCH)) {
sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branch);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
Expand All @@ -42,6 +43,7 @@
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.internal.SQLConf;
Expand All @@ -58,8 +60,9 @@ public TestCopyOnWriteDelete(
Map<String, String> config,
String fileFormat,
Boolean vectorized,
String distributionMode) {
super(catalogName, implementation, config, fileFormat, vectorized, distributionMode);
String distributionMode,
String branch) {
super(catalogName, implementation, config, fileFormat, vectorized, distributionMode, branch);
}

@Override
Expand All @@ -82,6 +85,7 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception
tableName, DELETE_ISOLATION_LEVEL, "snapshot");

sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
createBranch();

Table table = Spark3Util.loadIcebergTable(spark, tableName);

Expand Down Expand Up @@ -111,7 +115,7 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception
Future<?> appendFuture =
executorService.submit(
() -> {
GenericRecord record = GenericRecord.create(table.schema());
GenericRecord record = GenericRecord.create(SnapshotUtil.schemaFor(table, branch));
record.set(0, 1); // id
record.set(1, "hr"); // dep

Expand All @@ -126,7 +130,12 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception

for (int numAppends = 0; numAppends < 5; numAppends++) {
DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
table.newFastAppend().appendFile(dataFile).commit();
AppendFiles appendFiles = table.newFastAppend().appendFile(dataFile);
if (branch != null) {
appendFiles.toBranch(branch);
}

appendFiles.commit();
sleep(10);
}

Expand All @@ -151,9 +160,11 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception

@Test
public void testRuntimeFilteringWithPreservedDataGrouping() throws NoSuchTableException {
Assume.assumeTrue("test".equals(branch));
createAndInitPartitionedTable();

append(new Employee(1, "hr"), new Employee(3, "hr"));
append(tableName, new Employee(1, "hr"), new Employee(3, "hr"));
createBranch();
append(new Employee(1, "hardware"), new Employee(2, "hardware"));

Map<String, String> sqlConf =
Expand All @@ -163,17 +174,17 @@ public void testRuntimeFilteringWithPreservedDataGrouping() throws NoSuchTableEx
SparkSQLProperties.PRESERVE_DATA_GROUPING,
"true");

withSQLConf(sqlConf, () -> sql("DELETE FROM %s WHERE id = 2", tableName));
withSQLConf(sqlConf, () -> sql("DELETE FROM %s WHERE id = 2", commitTarget()));

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots()));

Snapshot currentSnapshot = table.currentSnapshot();
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
validateCopyOnWrite(currentSnapshot, "1", "1", "1");

assertEquals(
"Should have expected rows",
ImmutableList.of(row(1, "hardware"), row(1, "hr"), row(3, "hr")),
sql("SELECT * FROM %s ORDER BY id, dep", tableName));
sql("SELECT * FROM %s ORDER BY id, dep", selectTarget()));
}
}
Loading

0 comments on commit 0826434

Please sign in to comment.








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/iceberg/commit/08264343d3c9bc33490dd2f42ab3fc2a69ee796b

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy