Content-Length: 702409 | pFad | http://github.com/apache/iceberg/pull/6651/commits/7642b9e44fe8449e7d8bbd0194642f6c9011d009

AD Spark 3.3 write to branch snapshot by namrathamyske · Pull Request #6651 · apache/iceberg · GitHub
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.3 write to branch snapshot #6651

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
9e8bf34
Spark 3.3 write to branch
namrathamyske Jan 23, 2023
ee4cadb
Spark 3.3 write to branch refactoring by review comments
namrathamyske Jan 23, 2023
3225506
Spark 3.3 write to branch refactoring by review comments
namrathamyske Jan 23, 2023
e1dfa45
Spark 3.3 write to branch data write test
namrathamyske Jan 23, 2023
58b4bf2
spotless
namrathamyske Jan 24, 2023
8677134
checking if snapshot set is branch
namrathamyske Jan 24, 2023
af17f25
Merge branch 'master' of https://github.com/apache/iceberg into spark…
namrathamyske Jan 25, 2023
7642b9e
Spark: address comments for spark branch writes
amogh-jahagirdar Feb 1, 2023
da9dcc0
Merge commit 'refs/pull/25/head' of https://github.com/namrathamyske/…
namrathamyske Feb 4, 2023
ca8e1ff
Merge branch 'master' of https://github.com/apache/iceberg into spark…
namrathamyske Feb 7, 2023
2e4eefe
review comments
namrathamyske Feb 11, 2023
de20c76
review comments
namrathamyske Feb 11, 2023
85d7475
spotless
namrathamyske Feb 11, 2023
bbf57e3
review comments changes
namrathamyske Feb 12, 2023
0e081e1
review comments changes
namrathamyske Feb 12, 2023
51b1052
new line change reversal
namrathamyske Feb 12, 2023
aa42e2e
Spark: Add tests for overwrite case
amogh-jahagirdar Feb 12, 2023
03c962d
Merge pull request #26 from amogh-jahagirdar/spark-branch-writes-more…
namrathamyske Feb 17, 2023
bed5ec3
nit review comments
namrathamyske Feb 17, 2023
332064e
Merge branch 'master' of https://github.com/apache/iceberg into spark…
namrathamyske Feb 17, 2023
6ef5f4e
Merge branch 'spark_writes' of https://github.com/namrathamyske/icebe…
namrathamyske Feb 17, 2023
8ecfdcd
adding write conf back
namrathamyske Feb 17, 2023
6b8f954
Remove SQL Write Conf, fail if write conf is specified for row level …
amogh-jahagirdar Feb 22, 2023
f8b34bd
Merge branch 'master' into spark_writes
amogh-jahagirdar Feb 22, 2023
a8a5d89
Merge branch 'master' into spark_writes
amogh-jahagirdar Feb 22, 2023
7ee1689
Address cleanup
amogh-jahagirdar Feb 23, 2023
64db07e
Allow non-existing branches in catalog#loadTable
amogh-jahagirdar Feb 23, 2023
1b2cd5a
Merge branch 'master' into spark_writes
amogh-jahagirdar Feb 23, 2023
4c94693
Remove Spark branch write option, use identifier in branch, merge/del…
amogh-jahagirdar Feb 26, 2023
2f3d6e1
Add merge tests
amogh-jahagirdar Feb 27, 2023
9bbed3a
Style
amogh-jahagirdar Feb 27, 2023
51a29b3
Remove setting branch in scan
amogh-jahagirdar Feb 27, 2023
b2692fe
Fix for metadata tables
amogh-jahagirdar Feb 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Spark: address comments for spark branch writes
  • Loading branch information
amogh-jahagirdar committed Feb 1, 2023
commit 7642b9e44fe8449e7d8bbd0194642f6c9011d009
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ public String branch() {
return confParser
.stringConf()
.option(SparkWriteOptions.BRANCH)
.sessionConf(SparkWriteOptions.BRANCH)
.defaultValue(SnapshotRef.MAIN_BRANCH)
.parse();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private SparkWriteOptions() {}

// Isolation Level for DataFrame calls. Currently supported by overwritePartitions
public static final String ISOLATION_LEVEL = "isolation-level";

// Branch to write to
public static final String BRANCH = "branch";
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
private final Map<String, String> extraSnapshotMetadata;
private final Distribution requiredDistribution;
private final SortOrder[] requiredOrdering;
private final String branch;

private boolean cleanupOnAbort = true;

Expand All @@ -123,6 +124,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
this.applicationId = spark.sparkContext().applicationId();
this.wapEnabled = writeConf.wapEnabled();
this.wapId = writeConf.wapId();
this.branch = writeConf.branch();
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.requiredDistribution = requiredDistribution;
this.requiredOrdering = requiredOrdering;
Expand Down Expand Up @@ -277,6 +279,7 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {

try {
long start = System.currentTimeMillis();
operation.toBranch(branch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to make sure the tests for this are exercising branches.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I don't see tests that would cover this.

operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -248,11 +247,8 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {

@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
boolean branchOptionPresent = info.options().containsKey(SparkWriteOptions.BRANCH);
if (!branchOptionPresent) {
Preconditions.checkArgument(
snapshotId == null, "Cannot write to table at a specific snapshot: %s", snapshotId);
}
Preconditions.checkArgument(
snapshotId == null, "Cannot write to table at a specific snapshot: %s", snapshotId);
return new SparkWriteBuilder(sparkSession(), icebergTable, info);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {

try {
long start = System.currentTimeMillis();
operation.toBranch(branch);
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
Expand Down Expand Up @@ -292,7 +293,7 @@ public String toString() {
private class BatchAppend extends BaseBatchWrite {
@Override
public void commit(WriterCommitMessage[] messages) {
AppendFiles append = table.newAppend().toBranch(branch);
AppendFiles append = table.newAppend();

int numFiles = 0;
for (DataFile file : files(messages)) {
Expand All @@ -314,7 +315,7 @@ public void commit(WriterCommitMessage[] messages) {
return;
}

ReplacePartitions dynamicOverwrite = table.newReplacePartitions().toBranch(branch);
ReplacePartitions dynamicOverwrite = table.newReplacePartitions();

IsolationLevel isolationLevel = writeConf.isolationLevel();
Long validateFromSnapshotId = writeConf.validateFromSnapshotId();
Expand Down Expand Up @@ -352,8 +353,7 @@ private OverwriteByFilter(Expression overwriteExpr) {

@Override
public void commit(WriterCommitMessage[] messages) {
OverwriteFiles overwriteFiles =
table.newOverwrite().toBranch(branch).overwriteByRowFilter(overwriteExpr);
OverwriteFiles overwriteFiles = table.newOverwrite().overwriteByRowFilter(overwriteExpr);

int numFiles = 0;
for (DataFile file : files(messages)) {
Expand Down Expand Up @@ -414,7 +414,7 @@ private Expression conflictDetectionFilter() {

@Override
public void commit(WriterCommitMessage[] messages) {
OverwriteFiles overwriteFiles = table.newOverwrite().toBranch(branch);
OverwriteFiles overwriteFiles = table.newOverwrite();

List<DataFile> overwrittenFiles = overwrittenFiles();
int numOverwrittenFiles = overwrittenFiles.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
Expand Down Expand Up @@ -71,7 +72,7 @@ public class TestSparkDataWrite {

@Rule public TemporaryFolder temp = new TemporaryFolder();

private String branch;
private String targetBranch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test looks good, but I think we need to find the other paths that are affected and update those tests as well.


@Parameterized.Parameters(name = "format = {0}, branch = {1}")
public static Object[] parameters() {
Expand Down Expand Up @@ -102,9 +103,9 @@ public static void stopSpark() {
currentSpark.stop();
}

public TestSparkDataWrite(String format, String branch) {
public TestSparkDataWrite(String format, String targetBranch) {
this.format = FileFormat.fromString(format);
this.branch = branch;
this.targetBranch = targetBranch;
}

@Test
Expand All @@ -127,19 +128,19 @@ public void testBasicWrite() throws IOException {
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.option("branch", branch)
.option("branch", targetBranch)
.save(location.toString());

table.refresh();

Dataset<Row> result =
spark.read().format("iceberg").option("branch", branch).load(location.toString());
spark.read().format("iceberg").option("branch", targetBranch).load(location.toString());

List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
Assert.assertEquals("Result rows should match", expected, actual);
for (ManifestFile manifest : table.snapshot(branch).allManifests(table.io())) {
for (ManifestFile manifest : latestSnapshot(table, targetBranch).allManifests(table.io())) {
for (DataFile file : ManifestFiles.read(manifest, table.io())) {
// TODO: avro not support split
if (!format.equals(FileFormat.AVRO)) {
Expand Down Expand Up @@ -187,7 +188,7 @@ public void testAppend() throws IOException {
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.option("branch", branch)
.option("branch", targetBranch)
.save(location.toString());

df.withColumn("id", df.col("id").plus(3))
Expand All @@ -196,13 +197,13 @@ public void testAppend() throws IOException {
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.option("branch", branch)
.option("branch", targetBranch)
.save(location.toString());

table.refresh();

Dataset<Row> result =
spark.read().format("iceberg").option("branch", branch).load(location.toString());
spark.read().format("iceberg").option("branch", targetBranch).load(location.toString());

List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Expand Down Expand Up @@ -231,7 +232,7 @@ public void testEmptyOverwrite() throws IOException {
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.option("branch", branch)
.option("branch", targetBranch)
.save(location.toString());

Dataset<Row> empty = spark.createDataFrame(ImmutableList.of(), SimpleRecord.class);
Expand All @@ -242,13 +243,13 @@ public void testEmptyOverwrite() throws IOException {
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Overwrite)
.option("overwrite-mode", "dynamic")
.option("branch", branch)
.option("branch", targetBranch)
.save(location.toString());

table.refresh();

Dataset<Row> result =
spark.read().format("iceberg").option("branch", branch).load(location.toString());
spark.read().format("iceberg").option("branch", targetBranch).load(location.toString());

List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Expand Down Expand Up @@ -284,7 +285,7 @@ public void testOverwrite() throws IOException {
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.option("branch", branch)
.option("branch", targetBranch)
.save(location.toString());

// overwrite with 2*id to replace record 2, append 4 and 6
Expand All @@ -295,13 +296,13 @@ public void testOverwrite() throws IOException {
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Overwrite)
.option("overwrite-mode", "dynamic")
.option("branch", branch)
.option("branch", targetBranch)
.save(location.toString());

table.refresh();

Dataset<Row> result =
spark.read().format("iceberg").option("branch", branch).load(location.toString());
spark.read().format("iceberg").option("branch", targetBranch).load(location.toString());

List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Expand Down Expand Up @@ -329,7 +330,7 @@ public void testUnpartitionedOverwrite() throws IOException {
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.option("branch", branch)
.option("branch", targetBranch)
.save(location.toString());

// overwrite with the same data; should not produce two copies
Expand All @@ -338,13 +339,13 @@ public void testUnpartitionedOverwrite() throws IOException {
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Overwrite)
.option("branch", branch)
.option("branch", targetBranch)
.save(location.toString());

table.refresh();

Dataset<Row> result =
spark.read().format("iceberg").option("branch", branch).load(location.toString());
spark.read().format("iceberg").option("branch", targetBranch).load(location.toString());

List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Expand Down Expand Up @@ -378,21 +379,21 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.option("branch", branch)
.option("branch", targetBranch)
.save(location.toString());

table.refresh();

Dataset<Row> result =
spark.read().format("iceberg").option("branch", branch).load(location.toString());
spark.read().format("iceberg").option("branch", targetBranch).load(location.toString());

List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
Assert.assertEquals("Result rows should match", expected, actual);

List<DataFile> files = Lists.newArrayList();
for (ManifestFile manifest : table.snapshot(branch).allManifests(table.io())) {
for (ManifestFile manifest : latestSnapshot(table, targetBranch).allManifests(table.io())) {
for (DataFile file : ManifestFiles.read(manifest, table.io())) {
files.add(file);
}
Expand Down Expand Up @@ -674,6 +675,14 @@ public void testCommitUnknownException() throws IOException {
Assert.assertEquals("Result rows should match", records, actual);
}

private Snapshot latestSnapshot(Table table, String branch) {
if ("main".equals(branch)) {
return table.currentSnapshot();
} else {
return table.snapshot(branch);
}
}

public enum IcebergOptionsType {
NONE,
TABLE,
Expand Down








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/iceberg/pull/6651/commits/7642b9e44fe8449e7d8bbd0194642f6c9011d009

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy