-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GCP: Add prefix and bulk operations to GCSFileIO #8168
Changes from 6 commits
69f364c
9c243b3
738ce39
b4f1b65
278ffef
0c42c4e
3a7d413
d045eef
d90264e
a3475bb
edbffce
f45e9e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,17 +20,24 @@ | |
|
||
import com.google.auth.oauth2.AccessToken; | ||
import com.google.auth.oauth2.OAuth2Credentials; | ||
import com.google.cloud.storage.Blob; | ||
import com.google.cloud.storage.BlobId; | ||
import com.google.cloud.storage.Storage; | ||
import com.google.cloud.storage.StorageOptions; | ||
import java.util.Map; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import org.apache.iceberg.common.DynConstructors; | ||
import org.apache.iceberg.gcp.GCPProperties; | ||
import org.apache.iceberg.io.BulkDeletionFailureException; | ||
import org.apache.iceberg.io.FileIO; | ||
import org.apache.iceberg.io.FileInfo; | ||
import org.apache.iceberg.io.InputFile; | ||
import org.apache.iceberg.io.OutputFile; | ||
import org.apache.iceberg.io.SupportsBulkOperations; | ||
import org.apache.iceberg.io.SupportsPrefixOperations; | ||
import org.apache.iceberg.metrics.MetricsContext; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Iterables; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Streams; | ||
import org.apache.iceberg.util.SerializableMap; | ||
import org.apache.iceberg.util.SerializableSupplier; | ||
import org.slf4j.Logger; | ||
|
@@ -48,7 +55,7 @@ | |
* <p>See <a href="https://cloud.google.com/storage/docs/folders#overview">Cloud Storage | ||
* Overview</a> | ||
*/ | ||
public class GCSFileIO implements FileIO { | ||
public class GCSFileIO implements FileIO, SupportsBulkOperations, SupportsPrefixOperations { | ||
private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class); | ||
private static final String DEFAULT_METRICS_IMPL = | ||
"org.apache.iceberg.hadoop.HadoopMetricsContext"; | ||
|
@@ -174,4 +181,46 @@ public void close() { | |
} | ||
} | ||
} | ||
|
||
@Override | ||
public Iterable<FileInfo> listPrefix(String prefix) { | ||
GCSLocation location = new GCSLocation(prefix); | ||
return () -> | ||
client() | ||
.list(location.bucket(), Storage.BlobListOption.prefix(location.prefix())) | ||
.streamAll() | ||
.map( | ||
blob -> | ||
new FileInfo( | ||
String.format("gs://%s/%s", blob.getBucket(), blob.getName()), | ||
blob.getSize(), | ||
getCreateTimeMillis(blob))) | ||
.iterator(); | ||
} | ||
|
||
private long getCreateTimeMillis(Blob blob) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Iceberg typically doesn't use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, I changed this to |
||
if (blob.getCreateTimeOffsetDateTime() == null) { | ||
return 0; | ||
} | ||
return blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli(); | ||
} | ||
|
||
@Override | ||
public void deletePrefix(String prefix) { | ||
internalDeleteFiles( | ||
() -> | ||
Streams.stream(listPrefix(prefix)) | ||
.map(fileInfo -> BlobId.fromGsUtilUri(fileInfo.location())) | ||
.iterator()); | ||
} | ||
|
||
@Override | ||
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException { | ||
internalDeleteFiles(() -> Streams.stream(pathsToDelete).map(BlobId::fromGsUtilUri).iterator()); | ||
} | ||
|
||
private void internalDeleteFiles(Iterable<BlobId> blobIdsToDelete) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was wondering whether we should pass the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that is an improvement, I made this change |
||
Streams.stream(Iterables.partition(blobIdsToDelete, gcpProperties.deleteBatchSize())) | ||
.forEach(batch -> client().delete(batch)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.gcp.gcs; | ||
|
||
import com.google.cloud.storage.BlobId; | ||
import org.apache.iceberg.exceptions.ValidationException; | ||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
|
||
/** | ||
* This class represents a fully qualified location in GCS expressed as a URI. This class allows for | ||
* URIs with only a bucket and no path specified, unlike with {@link BlobId#fromGsUtilUri(String)}. | ||
*/ | ||
class GCSLocation { | ||
private static final String SCHEME_DELIM = "://"; | ||
private static final String PATH_DELIM = "/"; | ||
private static final String QUERY_DELIM = "\\?"; | ||
private static final String FRAGMENT_DELIM = "#"; | ||
|
||
private static final String EXPECTED_SCHEME = "gs"; | ||
|
||
private final String bucket; | ||
private final String prefix; | ||
|
||
/** | ||
* Creates a new GCSLocation with the form of scheme://bucket/path?query#fragment | ||
* | ||
* @param location fully qualified URI | ||
*/ | ||
GCSLocation(String location) { | ||
Preconditions.checkNotNull(location, "Location cannot be null."); | ||
bryanck marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
String[] schemeSplit = location.split(SCHEME_DELIM, -1); | ||
ValidationException.check( | ||
schemeSplit.length == 2, "Invalid GCS URI, cannot determine scheme: %s", location); | ||
|
||
String scheme = schemeSplit[0]; | ||
ValidationException.check( | ||
EXPECTED_SCHEME.equals(scheme), "Invalid GCS URI, invalid scheme: %s", scheme); | ||
|
||
String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2); | ||
|
||
this.bucket = authoritySplit[0]; | ||
|
||
// Strip query and fragment if they exist | ||
String path = authoritySplit.length > 1 ? authoritySplit[1] : ""; | ||
path = path.split(QUERY_DELIM, -1)[0]; | ||
path = path.split(FRAGMENT_DELIM, -1)[0]; | ||
this.prefix = path; | ||
} | ||
|
||
/** Returns GCS bucket name. */ | ||
public String bucket() { | ||
return bucket; | ||
} | ||
|
||
/** Returns GCS object name prefix. */ | ||
public String prefix() { | ||
return prefix; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,14 +20,20 @@ | |
|
||
import static java.lang.String.format; | ||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.mockito.Mockito.any; | ||
import static org.mockito.Mockito.doAnswer; | ||
import static org.mockito.Mockito.spy; | ||
|
||
import com.google.api.client.util.Lists; | ||
nastra marked this conversation as resolved.
Show resolved
Hide resolved
|
||
import com.google.cloud.storage.BlobId; | ||
import com.google.cloud.storage.BlobInfo; | ||
import com.google.cloud.storage.Storage; | ||
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.OutputStream; | ||
import java.nio.ByteBuffer; | ||
import java.util.List; | ||
import java.util.Random; | ||
import java.util.stream.StreamSupport; | ||
import org.apache.iceberg.TestHelpers; | ||
|
@@ -36,6 +42,7 @@ | |
import org.apache.iceberg.io.IOUtil; | ||
import org.apache.iceberg.io.InputFile; | ||
import org.apache.iceberg.io.OutputFile; | ||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
|
@@ -44,11 +51,25 @@ public class GCSFileIOTest { | |
private static final String TEST_BUCKET = "TEST_BUCKET"; | ||
private final Random random = new Random(1); | ||
|
||
private final Storage storage = LocalStorageHelper.getOptions().getService(); | ||
private final Storage storage = spy(LocalStorageHelper.getOptions().getService()); | ||
private GCSFileIO io; | ||
|
||
@BeforeEach | ||
public void before() { | ||
// LocalStorageHelper doesn't support batch operations, so mock that here | ||
doAnswer( | ||
invoke -> { | ||
Iterable<BlobId> iter = invoke.getArgument(0); | ||
List<Boolean> answer = Lists.newArrayList(); | ||
iter.forEach( | ||
blobId -> { | ||
answer.add(storage.delete(blobId)); | ||
}); | ||
return answer; | ||
}) | ||
.when(storage) | ||
.delete(any(Iterable.class)); | ||
|
||
io = new GCSFileIO(() -> storage, new GCPProperties()); | ||
} | ||
|
||
|
@@ -91,7 +112,7 @@ public void testDelete() { | |
.count()) | ||
.isEqualTo(1); | ||
|
||
io.deleteFile(format("gs://%s/%s", TEST_BUCKET, path)); | ||
io.deleteFile(gsUri(path)); | ||
|
||
// The bucket should now be empty | ||
assertThat( | ||
|
@@ -100,6 +121,70 @@ public void testDelete() { | |
.isZero(); | ||
} | ||
|
||
private String gsUri(String path) { | ||
return format("gs://%s/%s", TEST_BUCKET, path); | ||
} | ||
|
||
@Test | ||
public void testListPrefix() { | ||
String prefix = "list/path/"; | ||
String path1 = prefix + "data1.dat"; | ||
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build()); | ||
String path2 = prefix + "data2.dat"; | ||
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build()); | ||
String path3 = "list/skip/data3.dat"; | ||
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build()); | ||
|
||
assertThat(StreamSupport.stream(io.listPrefix(gsUri("list/")).spliterator(), false).count()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry I missed this initially, but there's no need to wrap this in a stream. This (and other places) can be simplified to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed there are also existing tests in that file that do the same thing, could you please update those as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The intent was to avoid using the FileIO we're testing to do the assertion and use the Google client directly, I assume the same for the existing tests. |
||
.isEqualTo(3); | ||
|
||
assertThat(StreamSupport.stream(io.listPrefix(gsUri(prefix)).spliterator(), false).count()) | ||
.isEqualTo(2); | ||
|
||
assertThat(StreamSupport.stream(io.listPrefix(gsUri(path1)).spliterator(), false).count()) | ||
.isEqualTo(1); | ||
} | ||
|
||
@Test | ||
public void testDeleteFiles() { | ||
String prefix = "del/path/"; | ||
String path1 = prefix + "data1.dat"; | ||
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build()); | ||
String path2 = prefix + "data2.dat"; | ||
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build()); | ||
String path3 = "del/skip/data3.dat"; | ||
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build()); | ||
|
||
assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count()) | ||
.isEqualTo(3); | ||
|
||
Iterable<String> deletes = | ||
() -> ImmutableList.of(gsUri(path1), gsUri(path3)).stream().iterator(); | ||
io.deleteFiles(deletes); | ||
|
||
assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count()) | ||
.isEqualTo(1); | ||
} | ||
|
||
@Test | ||
public void testDeletePrefix() { | ||
String prefix = "del/path/"; | ||
String path1 = prefix + "data1.dat"; | ||
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build()); | ||
String path2 = prefix + "data2.dat"; | ||
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build()); | ||
String path3 = "del/skip/data3.dat"; | ||
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build()); | ||
|
||
assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count()) | ||
.isEqualTo(3); | ||
|
||
io.deletePrefix(gsUri(prefix)); | ||
|
||
assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count()) | ||
.isEqualTo(1); | ||
} | ||
|
||
@Test | ||
public void testGCSFileIOKryoSerialization() throws IOException { | ||
FileIO testGCSFileIO = new GCSFileIO(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably ok to set this here, alternative maybe just in the line above?
private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I updated this. I was just following the pattern in
S3FileIO
.