-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Parquet Row Group Bloom Filter Support #4831
Conversation
Co-authored-by: Xi Chen <jshmchenxi@163.com> Co-authored-by: Hao Lin <linhao@qiyi.com> Co-authored-by: Huaxin Gao <huaxin_gao@apple.com>
Co-authored-by: Xi Chen <jshmchenxi@163.com> Co-authored-by: Hao Lin <linhao1990@gmail.com> Co-authored-by: Huaxin Gao <huaxin_gao@apple.com>
@huaxingao Thanks for continuing this work! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work. Left some comment.
.set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true") | ||
.set(TableProperties.PARQUET_BATCH_SIZE, "4") | ||
.commit(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand this. Can we inject these properties directly when the table is created instead of changing it after it is created?
table = catalog.createTable(TableIdentifier.of("default", name), schema);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These properties can be set either at table creation or updated later. I don't think it matters. The reason I picked the second choice is because when I wrote the test, I happened to use TestSparkReaderDeletes
as my template and followed the style there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to following existing style
parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java
Outdated
Show resolved
Hide resolved
parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java
Show resolved
Hide resolved
cc @aokolnychyi @RussellSpitzer @flyrain @szehon-ho |
also cc @kbendick @chenjunjiedada @rdblue |
public static final boolean DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT = false; | ||
|
||
public static final String PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column."; | ||
public static final String PARQUET_BLOOM_FILTER_COLUMN_EXPECTED_NDV_PREFIX = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to document that the NDV is specific for a parquet file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank for your comment. I documented this in configuration.md
.
@@ -167,6 +167,16 @@ private TableProperties() { | |||
"write.delete.parquet.row-group-check-max-record-count"; | |||
public static final int PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT = 10000; | |||
|
|||
public static final String DEFAULT_PARQUET_BLOOM_FILTER_ENABLED = "write.parquet.bloom-filter-enabled.default"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it ever make sense to enable bloom filter for all columns? should we only allow bloom filter for explicitly specified columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not a common usage to enable bloom filter for all columns, but it's legal. This is consistent with the parquet-mr bloom filter implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Steven. I think it only makes sense to enable bloom filters for some columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. While it's consistent with the parquet-mr bloom filter implementaiton, we need to think of user experience first and foremost.
It doesn't make sense to enable bloom filters for a lot of columns. And many users don't do any tuning of their metadata / statistics.
I think it's in-line with other things we do to make the users experience better, like turning off column level statistics after a certain number of columns. We can point it out in the docs under a big !!!NOTE
(that's highlighted) that bloom filter is only used when turned on.
It's really an advanced thing to use at all imo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I have removed DEFAULT_PARQUET_BLOOM_FILTER_ENABLED
. Now user needs to enable bloom filter for individual column using PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX
. If the column is a complex type, user needs to enable the column inside the complex type, for example,
set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "struct_col.int_field", "true")
| write.parquet.bloom-filter-enabled.default | false | Whether to enable writing bloom filter for all columns | | ||
| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Whether to enable writing bloom filter for column 'col1' to allow per-column configuration; This property overrides `bloom-filter-enabled.default` for the specified column; For example, setting both `write.parquet.bloom-filter-enabled.default=true` and `write.parquet.bloom-filter-enabled.column.some_col=false` will enable bloom filter for all columns except `some_col` | | ||
| write.parquet.bloom-filter-expected-ndv.column.col1 | (not set) | The expected number of distinct values in a column, it is used to compute the optimal size of the bloom filter; Note that the NDV is specific for a parquet file. If this property is not set, the bloom filter will use the maximum size set in `bloom-filter-max-bytes`; If this property is set for a column, then no need to enable the bloom filter with `write.parquet.bloom-filter-enabled` property | | ||
| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the behavior of this? If the NDV requires a size that is too large, does it skip writing the bloom filter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the NDV requires a size that is too large, parquet still writes the bloom filter using the max bytes set by this property, not using the bitset calculated by NDV.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess there probably isn't much we can do about this, although that behavior makes no sense to me. Is it possible to set the expected false positive probability anywhere? Or is that hard-coded in the Parquet library?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There isn't a property to set fpp in Parquet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What fpp is used by Parquet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parquet uses 0.01 for fpp.
I chatted offline with @chenjunjiedada, we can probably add a config for fpp in parquet first, and then in iceberg.
parquetWriteBuilder.withBloomFilterNDV(colPath, Long.valueOf(numDistinctValue)); | ||
} | ||
|
||
return new ParquetWriteAdapter<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi, what do you think about removing the old ParquetWriteAdapter
code? I don't think that anyone uses it anymore.
"parquet", | ||
RANDOM.nextBoolean(), | ||
WRITE_DISTRIBUTION_MODE_HASH, | ||
true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not add bloom filter testing to one of the existing cases? I don't see a reason why this can't be tested along with other cases. And Spark tests already take a long time. We should avoid adding more cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to have some bloom filter write/read tests that involve copy on write or merge on read. I think this is the only place that test copy on write or merge on read (I maybe wrong). That's why I added a parameter here to turn on bloom filter to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to have some tests with the row level operations. What I was pointing out is that we can add bloom filters to the existing tests. There's no need to execute the entire suite an additional time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thanks for the clarification. Instead of executing the entire suite, is it OK to add a new (but small) test suite to test bloom filter for copy on write or merge on read? I didn't find any existing copy on write/merge on read tests with filters.
parquet/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java
Outdated
Show resolved
Hide resolved
Thanks for working on this, @huaxingao! It looks close overall. Does it make sense to split this into separate read and write PRs? It seems like adding the evaluator is quite a lot to review independently, and we don't need the write path for its tests. |
} | ||
|
||
public static Set<Integer> boundReferences( | ||
StructType struct, List<Expression> exprs, boolean caseSensitive, boolean alreadyBound) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is a good idea to add a second boolean argument to this method. It is confusing enough with just one.
How about using a different method name for this and then renaming this to a be a private internal implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this method name to exprReferences
(please let me know if you have a better name), but I still need to keep this as public
because I need to access this method from ParquetBloomRowGroupFilter
, which is in a different package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just references
since it doesn't bind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Will change.
config.keySet().stream() | ||
.filter(key -> key.startsWith(prefix)) | ||
.forEach(key -> { | ||
String columnPath = key.replaceFirst(prefix, ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this uses column name in the config, is there any logic to update these configs when columns are renamed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I don't have a logic to update the configs when columns are renamed. I think we are OK, though. At write path, I use these configs to write bloom filters at file creation time. I don't use these configs any more for read. At read path, the bloom filters are loaded using id instead of column name. If the columns are renamed after the bloom filters have been written, as long as the id are still the same, the bloom filters should be able to loaded OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're okay for adding read support, but we should consider how to configure write support then.
parquet/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java
Outdated
Show resolved
Hide resolved
|
||
private <T> boolean shouldRead(PrimitiveType primitiveType, T value, BloomFilter bloom, Type type) { | ||
long hashValue = 0; | ||
switch (type.typeId()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this needs to use the Parquet type from the file, not the Iceberg type.
The Iceberg type comes from the read schema, which could be long
when the file's type is int
because of type promotion rules. We need to make sure that the hash function used matches the file's type because that was what produced the bloom filter. Hopefully, the same hash function is used for long
and int
in Parquet, but I'm not sure that is the case so the safest thing is to convert value
to the Parquet type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the code to switch (primitiveType.getPrimitiveTypeName())
parquet/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java
Outdated
Show resolved
Hide resolved
hashValue = bloom.hash(((Number) value).intValue()); | ||
return bloom.findHash(hashValue); | ||
} | ||
case INT64: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this will fall through if the type ID is not handled? I think you need to add a break;
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix.
switch (type.typeId()) { | ||
case DECIMAL: | ||
BigDecimal decimalValue = (BigDecimal) value; | ||
hashValue = bloom.hash(decimalValue.unscaledValue().intValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what Parquet would do to hash a decimal stored as an int?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@huaxingao, other than cleaning up shouldRead
to add avoid falling through for unexpected Iceberg types, I think the read side of this is about ready to go. Do you want to open a separate PR for that?
@rdblue I have a question on how to test the read path. If we separate the write and read path, I don't have a good way to test the read path. Shall we do the write path first? For write path, we can test if the bloom filter is written OK by calling the Parquet bloom filter APIs. Or I just submit the read path PR without a test, and add the test after the write path is in? |
I'm going to close this because it is broken into separate read and write side PRs. |
@huaxingao |
Co-Authored-By: Xi Chen jshmchenxi@163.com
Co-Authored-By: Hao Lin linhao@qiyi.com
Co-Authored-By: Huaxin Gao huaxin_gao@apple.com
Currently, Iceberg has
ParquetMetricsRowGroupFilter
andParquetDictionaryRowGroupFilter
. This PR adds one more filterParquetBloomRowGroupFilter
, which takes advantage of the Parquet bloom filter to find out if a row group needs to be read or not.