Content-Length: 449131 | pFad | http://github.com/apache/iceberg/commit/bd495ec104de868ed49b9deec454f5386119c940

F9 Parquet: Add bloom filter options to the write path (#5035) · apache/iceberg@bd495ec · GitHub
Skip to content

Commit

Permalink
Parquet: Add bloom filter options to the write path (#5035)
Browse files Browse the repository at this point in the history
Co-authored-by: Xi Chen <jshmchenxi@163.com>
Co-authored-by: Hao Lin <linhao1990@gmail.com>
Co-authored-by: Huaxin Gao <huaxin_gao@apple.com>
  • Loading branch information
3 people authored Jun 30, 2022
1 parent da37818 commit bd495ec
Show file tree
Hide file tree
Showing 6 changed files with 704 additions and 14 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ private TableProperties() {
"write.delete.parquet.row-group-check-max-record-count";
public static final int PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT = 10000;

public static final String PARQUET_BLOOM_FILTER_MAX_BYTES = "write.parquet.bloom-filter-max-bytes";
public static final int PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024;

public static final String PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column.";

public static final String AVRO_COMPRESSION = "write.avro.compression-codec";
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size |
| write.parquet.compression-codec | gzip | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed |
| write.parquet.compression-level | null | Parquet compression level |
| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Enables writing a bloom filter for the column: col1|
| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset |
| write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed |
| write.avro.compression-level | null | Avro compression level |
| write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes |
Expand Down
68 changes: 55 additions & 13 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
Expand Down Expand Up @@ -240,6 +242,8 @@ public <D> FileAppender<D> build() throws IOException {
CompressionCodecName codec = context.codec();
int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount();
int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
Map<String, String> columnBloomFilterEnabled = context.columnBloomFilterEnabled();

if (compressionLevel != null) {
switch (codec) {
Expand Down Expand Up @@ -270,13 +274,21 @@ public <D> FileAppender<D> build() throws IOException {
conf.set(entry.getKey(), entry.getValue());
}

ParquetProperties parquetProperties = ParquetProperties.builder()
ParquetProperties.Builder propsBuilder = ParquetProperties.builder()
.withWriterVersion(writerVersion)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount)
.withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
.build();
.withMaxBloomFilterBytes(bloomFilterMaxBytes);

for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) {
String colPath = entry.getKey();
String bloomEnabled = entry.getValue();
propsBuilder.withBloomFilterEnabled(colPath, Boolean.valueOf(bloomEnabled));
}

ParquetProperties parquetProperties = propsBuilder.build();

return new org.apache.iceberg.parquet.ParquetWriter<>(
conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec,
Expand All @@ -293,15 +305,13 @@ public <D> FileAppender<D> build() throws IOException {
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize);
// Todo: The following code needs to be improved in the bloom filter write path PR.
for (Map.Entry<String, String> entry : config.entrySet()) {
String key = entry.getKey();
if (key.startsWith(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX)) {
String columnPath = key.replaceFirst(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX, "");
String value = entry.getValue();
parquetWriteBuilder.withBloomFilterEnabled(columnPath, Boolean.valueOf(value));
}

for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) {
String colPath = entry.getKey();
String bloomEnabled = entry.getValue();
parquetWriteBuilder.withBloomFilterEnabled(colPath, Boolean.valueOf(bloomEnabled));
}

return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig);
}
}
Expand All @@ -314,17 +324,23 @@ private static class Context {
private final String compressionLevel;
private final int rowGroupCheckMinRecordCount;
private final int rowGroupCheckMaxRecordCount;
private final int bloomFilterMaxBytes;
private final Map<String, String> columnBloomFilterEnabled;

private Context(int rowGroupSize, int pageSize, int dictionaryPageSize,
CompressionCodecName codec, String compressionLevel,
int rowGroupCheckMinRecordCount, int rowGroupCheckMaxRecordCount) {
int rowGroupCheckMinRecordCount, int rowGroupCheckMaxRecordCount,
int bloomFilterMaxBytes,
Map<String, String> columnBloomFilterEnabled) {
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
this.dictionaryPageSize = dictionaryPageSize;
this.codec = codec;
this.compressionLevel = compressionLevel;
this.rowGroupCheckMinRecordCount = rowGroupCheckMinRecordCount;
this.rowGroupCheckMaxRecordCount = rowGroupCheckMaxRecordCount;
this.bloomFilterMaxBytes = bloomFilterMaxBytes;
this.columnBloomFilterEnabled = columnBloomFilterEnabled;
}

static Context dataContext(Map<String, String> config) {
Expand Down Expand Up @@ -357,8 +373,16 @@ static Context dataContext(Map<String, String> config) {
Preconditions.checkArgument(rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
"Row group check maximum record count must be >= minimal record count");

int bloomFilterMaxBytes = PropertyUtil.propertyAsInt(config, PARQUET_BLOOM_FILTER_MAX_BYTES,
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT);
Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max Bytes must be > 0");

Map<String, String> columnBloomFilterEnabled =
PropertyUtil.propertiesWithPrefix(config, PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX);

return new Context(rowGroupSize, pageSize, dictionaryPageSize, codec, compressionLevel,
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount);
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount, bloomFilterMaxBytes,
columnBloomFilterEnabled);
}

static Context deleteContext(Map<String, String> config) {
Expand Down Expand Up @@ -394,8 +418,16 @@ static Context deleteContext(Map<String, String> config) {
Preconditions.checkArgument(rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
"Row group check maximum record count must be >= minimal record count");

int bloomFilterMaxBytes = PropertyUtil.propertyAsInt(config, PARQUET_BLOOM_FILTER_MAX_BYTES,
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT);
Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max Bytes must be > 0");

Map<String, String> columnBloomFilterEnabled =
PropertyUtil.propertiesWithPrefix(config, PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX);

return new Context(rowGroupSize, pageSize, dictionaryPageSize, codec, compressionLevel,
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount);
rowGroupCheckMinRecordCount, rowGroupCheckMaxRecordCount, bloomFilterMaxBytes,
columnBloomFilterEnabled);
}

private static CompressionCodecName toCodec(String codecAsString) {
Expand Down Expand Up @@ -433,6 +465,14 @@ int rowGroupCheckMinRecordCount() {
int rowGroupCheckMaxRecordCount() {
return rowGroupCheckMaxRecordCount;
}

int bloomFilterMaxBytes() {
return bloomFilterMaxBytes;
}

Map<String, String> columnBloomFilterEnabled() {
return columnBloomFilterEnabled;
}
}
}

Expand Down Expand Up @@ -913,11 +953,13 @@ public <D> CloseableIterable<D> build() {
builder.useStatsFilter()
.useDictionaryFilter()
.useRecordFilter(filterRecords)
.useBloomFilter()
.withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive));
} else {
// turn off filtering
builder.useStatsFilter(false)
.useDictionaryFilter(false)
.useBloomFilter(false)
.useRecordFilter(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand Down Expand Up @@ -224,7 +225,7 @@ private void startRowGroup() {
compressor, parquetSchema, props.getAllocator(), this.columnIndexTruncateLength);

this.flushPageStoreToWriter = flushToWriter.bind(pageStore);
this.writeStore = props.newColumnWriteStore(parquetSchema, pageStore);
this.writeStore = props.newColumnWriteStore(parquetSchema, pageStore, (BloomFilterWriteStore) pageStore);

model.setColumnStore(writeStore);
}
Expand Down
Loading

0 comments on commit bd495ec

Please sign in to comment.








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/iceberg/commit/bd495ec104de868ed49b9deec454f5386119c940

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy