-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GCP: Add prefix and bulk operations to GCSFileIO #8168
Conversation
private int gcsDeleteBatchSize; | ||
|
||
public GCPProperties() { | ||
gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably ok to set this here, alternative maybe just in the line above? private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I updated this. I was just following the pattern in S3FileIO
.
.iterator(); | ||
} | ||
|
||
private long getCreateTimeMillis(Blob blob) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Iceberg typically doesn't use getXyz
methods. Maybe `createTimeMillisFrom(Blob)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I changed this to createTimeMillisFrom
internalDeleteFiles(() -> Streams.stream(pathsToDelete).map(BlobId::fromGsUtilUri).iterator()); | ||
} | ||
|
||
private void internalDeleteFiles(Iterable<BlobId> blobIdsToDelete) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering whether we should pass the Stream<BlobId
through to this method to have something like
@Override
public void deletePrefix(String prefix) {
internalDeleteFiles(
Streams.stream(listPrefix(prefix))
.map(fileInfo -> BlobId.fromGsUtilUri(fileInfo.location())));
}
@Override
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
internalDeleteFiles(Streams.stream(pathsToDelete).map(BlobId::fromGsUtilUri));
}
private void internalDeleteFiles(Stream<BlobId> blobIdsToDelete) {
Streams.stream(Iterators.partition(blobIdsToDelete.iterator(), gcpProperties.deleteBatchSize()))
.forEach(batch -> client().delete(batch));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is an improvement, I made this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly LGTM, just left a few minor comments
Co-authored-by: Eduard Tudenhoefner <etudenhoefner@gmail.com>
String path3 = "list/skip/data3.dat"; | ||
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build()); | ||
|
||
assertThat(StreamSupport.stream(io.listPrefix(gsUri("list/")).spliterator(), false).count()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I missed this initially, but there's no need to wrap this in a stream. This (and other places) can be simplified to assertThat(io.listPrefix(gsUri("list/"))).hasSize(3);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed there are also existing tests in that file that do the same thing, could you please update those as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent was to avoid using the FileIO we're testing to do the assertion and use the Google client directly, I assume the same for the existing tests.
gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @bryanck
* Spark: Update antlr4 to match Spark 3.4 (apache#7824) * Parquet: Revert workaround for resource usage with zstd (apache#7834) * GCP: fix single byte read in GCSInputStream (apache#8071) * GCP: fix byte read in GCSInputStream * add test * Parquet: Cache codecs by name and level (apache#8182) * GCP: Add prefix and bulk operations to GCSFileIO (apache#8168) * AWS, GCS: Allow access to underlying storage client (apache#8208) * spotless
This PR adds bulk and prefix operations to GCSFileIO, and has GCSFileIO implement the
SupportsBulkOperations
andSupportsPrefixOperations
interfaces.