Content-Length: 977220 | pFad | https://github.com/apache/iceberg/pull/4831

D5 Add Parquet Row Group Bloom Filter Support by huaxingao · Pull Request #4831 · apache/iceberg · GitHub
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Parquet Row Group Bloom Filter Support #4831

Closed
wants to merge 8 commits into from

Conversation

huaxingao
Copy link
Contributor

Co-Authored-By: Xi Chen jshmchenxi@163.com
Co-Authored-By: Hao Lin linhao@qiyi.com
Co-Authored-By: Huaxin Gao huaxin_gao@apple.com

Currently, Iceberg has ParquetMetricsRowGroupFilter and ParquetDictionaryRowGroupFilter. This PR adds one more filter ParquetBloomRowGroupFilter, which takes advantage of the Parquet bloom filter to find out if a row group needs to be read or not.

huaxingao and others added 3 commits May 20, 2022 18:50
Co-authored-by: Xi Chen <jshmchenxi@163.com>
Co-authored-by: Hao Lin <linhao@qiyi.com>
Co-authored-by: Huaxin Gao <huaxin_gao@apple.com>
Co-authored-by: Xi Chen <jshmchenxi@163.com>
Co-authored-by: Hao Lin <linhao1990@gmail.com>
Co-authored-by: Huaxin Gao <huaxin_gao@apple.com>
@jshmchenxi
Copy link
Contributor

@huaxingao Thanks for continuing this work!

Copy link
Contributor

@hililiwei hililiwei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work. Left some comment.

core/src/main/java/org/apache/iceberg/TableProperties.java Outdated Show resolved Hide resolved
.set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true")
.set(TableProperties.PARQUET_BATCH_SIZE, "4")
.commit();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this. Can we inject these properties directly when the table is created instead of changing it after it is created?

table = catalog.createTable(TableIdentifier.of("default", name), schema);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These properties can be set either at table creation or updated later. I don't think it matters. The reason I picked the second choice is because when I wrote the test, I happened to use TestSparkReaderDeletes as my template and followed the style there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to following existing style

api/src/main/java/org/apache/iceberg/data/Record.java Outdated Show resolved Hide resolved
@huaxingao
Copy link
Contributor Author

cc @aokolnychyi @RussellSpitzer @flyrain @szehon-ho
This PR is ready for review. Thank you very much in advance!

@huaxingao
Copy link
Contributor Author

also cc @kbendick @chenjunjiedada @rdblue

public static final boolean DEFAULT_PARQUET_BLOOM_FILTER_ENABLED_DEFAULT = false;

public static final String PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column.";
public static final String PARQUET_BLOOM_FILTER_COLUMN_EXPECTED_NDV_PREFIX =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to document that the NDV is specific for a parquet file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank for your comment. I documented this in configuration.md.

@github-actions github-actions bot added the docs label May 25, 2022
@@ -167,6 +167,16 @@ private TableProperties() {
"write.delete.parquet.row-group-check-max-record-count";
public static final int PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT = 10000;

public static final String DEFAULT_PARQUET_BLOOM_FILTER_ENABLED = "write.parquet.bloom-filter-enabled.default";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it ever make sense to enable bloom filter for all columns? should we only allow bloom filter for explicitly specified columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a common usage to enable bloom filter for all columns, but it's legal. This is consistent with the parquet-mr bloom filter implementations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Steven. I think it only makes sense to enable bloom filters for some columns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. While it's consistent with the parquet-mr bloom filter implementaiton, we need to think of user experience first and foremost.

It doesn't make sense to enable bloom filters for a lot of columns. And many users don't do any tuning of their metadata / statistics.

I think it's in-line with other things we do to make the users experience better, like turning off column level statistics after a certain number of columns. We can point it out in the docs under a big !!!NOTE (that's highlighted) that bloom filter is only used when turned on.

It's really an advanced thing to use at all imo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I have removed DEFAULT_PARQUET_BLOOM_FILTER_ENABLED. Now user needs to enable bloom filter for individual column using PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX. If the column is a complex type, user needs to enable the column inside the complex type, for example,
set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "struct_col.int_field", "true")

| write.parquet.bloom-filter-enabled.default | false | Whether to enable writing bloom filter for all columns |
| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Whether to enable writing bloom filter for column 'col1' to allow per-column configuration; This property overrides `bloom-filter-enabled.default` for the specified column; For example, setting both `write.parquet.bloom-filter-enabled.default=true` and `write.parquet.bloom-filter-enabled.column.some_col=false` will enable bloom filter for all columns except `some_col` |
| write.parquet.bloom-filter-expected-ndv.column.col1 | (not set) | The expected number of distinct values in a column, it is used to compute the optimal size of the bloom filter; Note that the NDV is specific for a parquet file. If this property is not set, the bloom filter will use the maximum size set in `bloom-filter-max-bytes`; If this property is set for a column, then no need to enable the bloom filter with `write.parquet.bloom-filter-enabled` property |
| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the behavior of this? If the NDV requires a size that is too large, does it skip writing the bloom filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the NDV requires a size that is too large, parquet still writes the bloom filter using the max bytes set by this property, not using the bitset calculated by NDV.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there probably isn't much we can do about this, although that behavior makes no sense to me. Is it possible to set the expected false positive probability anywhere? Or is that hard-coded in the Parquet library?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a property to set fpp in Parquet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What fpp is used by Parquet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet uses 0.01 for fpp.
I chatted offline with @chenjunjiedada, we can probably add a config for fpp in parquet first, and then in iceberg.

parquetWriteBuilder.withBloomFilterNDV(colPath, Long.valueOf(numDistinctValue));
}

return new ParquetWriteAdapter<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi, what do you think about removing the old ParquetWriteAdapter code? I don't think that anyone uses it anymore.

"parquet",
RANDOM.nextBoolean(),
WRITE_DISTRIBUTION_MODE_HASH,
true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add bloom filter testing to one of the existing cases? I don't see a reason why this can't be tested along with other cases. And Spark tests already take a long time. We should avoid adding more cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to have some bloom filter write/read tests that involve copy on write or merge on read. I think this is the only place that test copy on write or merge on read (I maybe wrong). That's why I added a parameter here to turn on bloom filter to test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to have some tests with the row level operations. What I was pointing out is that we can add bloom filters to the existing tests. There's no need to execute the entire suite an additional time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks for the clarification. Instead of executing the entire suite, is it OK to add a new (but small) test suite to test bloom filter for copy on write or merge on read? I didn't find any existing copy on write/merge on read tests with filters.

@rdblue
Copy link
Contributor

rdblue commented May 27, 2022

Thanks for working on this, @huaxingao! It looks close overall.

Does it make sense to split this into separate read and write PRs? It seems like adding the evaluator is quite a lot to review independently, and we don't need the write path for its tests.

@huaxingao
Copy link
Contributor Author

Thanks @rdblue @kbendick for reviewing!
I have addressed the comments and changed the code accordingly. I changed the code using this PR because I think it might be easier for you to check if I address the comments here. I can split this PR into two afterwards.

}

public static Set<Integer> boundReferences(
StructType struct, List<Expression> exprs, boolean caseSensitive, boolean alreadyBound) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a good idea to add a second boolean argument to this method. It is confusing enough with just one.

How about using a different method name for this and then renaming this to a be a private internal implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this method name to exprReferences (please let me know if you have a better name), but I still need to keep this as public because I need to access this method from ParquetBloomRowGroupFilter, which is in a different package.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just references since it doesn't bind?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Will change.

config.keySet().stream()
.filter(key -> key.startsWith(prefix))
.forEach(key -> {
String columnPath = key.replaceFirst(prefix, "");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this uses column name in the config, is there any logic to update these configs when columns are renamed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I don't have a logic to update the configs when columns are renamed. I think we are OK, though. At write path, I use these configs to write bloom filters at file creation time. I don't use these configs any more for read. At read path, the bloom filters are loaded using id instead of column name. If the columns are renamed after the bloom filters have been written, as long as the id are still the same, the bloom filters should be able to loaded OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're okay for adding read support, but we should consider how to configure write support then.


private <T> boolean shouldRead(PrimitiveType primitiveType, T value, BloomFilter bloom, Type type) {
long hashValue = 0;
switch (type.typeId()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this needs to use the Parquet type from the file, not the Iceberg type.

The Iceberg type comes from the read schema, which could be long when the file's type is int because of type promotion rules. We need to make sure that the hash function used matches the file's type because that was what produced the bloom filter. Hopefully, the same hash function is used for long and int in Parquet, but I'm not sure that is the case so the safest thing is to convert value to the Parquet type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the code to switch (primitiveType.getPrimitiveTypeName())

hashValue = bloom.hash(((Number) value).intValue());
return bloom.findHash(hashValue);
}
case INT64:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this will fall through if the type ID is not handled? I think you need to add a break;.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix.

switch (type.typeId()) {
case DECIMAL:
BigDecimal decimalValue = (BigDecimal) value;
hashValue = bloom.hash(decimalValue.unscaledValue().intValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what Parquet would do to hash a decimal stored as an int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao, other than cleaning up shouldRead to add avoid falling through for unexpected Iceberg types, I think the read side of this is about ready to go. Do you want to open a separate PR for that?

@huaxingao
Copy link
Contributor Author

@rdblue I have a question on how to test the read path. If we separate the write and read path, I don't have a good way to test the read path. Shall we do the write path first? For write path, we can test if the bloom filter is written OK by calling the Parquet bloom filter APIs. Or I just submit the read path PR without a test, and add the test after the write path is in?

@rdblue
Copy link
Contributor

rdblue commented Jun 29, 2022

I'm going to close this because it is broken into separate read and write side PRs.

@rdblue rdblue closed this Jun 29, 2022
@jia-zhengwei
Copy link

@huaxingao
How to create a bloom filter index for iceberg with spark?
I know I need to set (write.parquet.bloom-filter-enabled.column.col1='true') when create table, do I need other operations when insert data by pyspark?
How to make sure the bloom filter is OK? Will they have some new index file generated?

@huaxingao huaxingao deleted the bf branch March 19, 2024 21:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/apache/iceberg/pull/4831

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy