Content-Length: 370769 | pFad | http://github.com/apache/iceberg/pull/10207/commits/789191a9673949f39de96c74321ca51c27f50695

9B Flink: Prevent setting endTag/endSnapshotId for streaming source by pvary · Pull Request #10207 · apache/iceberg · GitHub
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Prevent setting endTag/endSnapshotId for streaming source #10207

Merged
merged 5 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fixed validation for FlinkSource too, and added tests for bounded sou…
…rces too
  • Loading branch information
Peter Vary committed Apr 26, 2024
commit 789191a9673949f39de96c74321ca51c27f50695
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,8 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSource {
private static final Logger LOG = LoggerFactory.getLogger(FlinkSource.class);

private FlinkSource() {}

/**
Expand Down Expand Up @@ -263,8 +259,9 @@ public FlinkInputFormat buildFormat() {

contextBuilder.resolveConfig(table, readOptions, readableConfig);

return new FlinkInputFormat(
tableLoader, icebergSchema, io, encryption, contextBuilder.build());
ScanContext context = contextBuilder.build();
context.validate();
return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, context);
}

public DataStream<RowData> build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package org.apache.iceberg.flink.source;

import static org.apache.iceberg.flink.SimpleDataUtil.SCHEMA;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -184,6 +187,23 @@ public void testReadPartitionColumn() throws Exception {
TestHelpers.assertRows(result, expected);
}

@TestTemplate
public void testValidation() {
catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA);

assertThatThrownBy(
() ->
FlinkSource.forRowData()
.env(StreamExecutionEnvironment.getExecutionEnvironment())
.tableLoader(tableLoader())
.streaming(false)
.endTag("tag")
.endSnapshotId(1L)
.build())
.hasMessage("END_SNAPSHOT_ID and END_TAG cannot both be set.")
.isInstanceOf(IllegalArgumentException.class);
}

private List<Row> runFormat(FlinkInputFormat inputFormat) throws IOException {
RowType rowType = FlinkSchemaUtil.convert(inputFormat.projectedSchema());
return TestHelpers.readRows(inputFormat, rowType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.iceberg.flink.source;

import static org.apache.iceberg.flink.SimpleDataUtil.SCHEMA;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -42,8 +45,25 @@
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.TestTemplate;

public class TestIcebergSourceBounded extends TestFlinkScan {
@TestTemplate
public void testValidation() {
catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA);

assertThatThrownBy(
() ->
IcebergSource.forRowData()
.tableLoader(tableLoader())
.assignerFactory(new SimpleSplitAssignerFactory())
.streaming(false)
.endTag("tag")
.endSnapshotId(1L)
.build())
.hasMessage("END_SNAPSHOT_ID and END_TAG cannot both be set.")
.isInstanceOf(IllegalArgumentException.class);
}

@Override
protected List<Row> runWithProjection(String... projected) throws Exception {
Expand Down








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/iceberg/pull/10207/commits/789191a9673949f39de96c74321ca51c27f50695

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy