Content-Length: 645835 | pFad | http://github.com/apache/iceberg/pull/8168/files/0c42c4e65b21f7cc4085acbd7975e0c7f9edd081

D5 GCP: Add prefix and bulk operations to GCSFileIO by bryanck · Pull Request #8168 · apache/iceberg · GitHub
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCP: Add prefix and bulk operations to GCSFileIO #8168

Merged
merged 12 commits into from
Aug 5, 2023
23 changes: 22 additions & 1 deletion gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import org.apache.iceberg.util.PropertyUtil;

public class GCPProperties implements Serializable {
// Service Options
Expand All @@ -40,6 +41,14 @@ public class GCPProperties implements Serializable {
public static final String GCS_OAUTH2_TOKEN = "gcs.oauth2.token";
public static final String GCS_OAUTH2_TOKEN_EXPIRES_AT = "gcs.oauth2.token-expires-at";

/** Configure the batch size used when deleting multiple files from a given GCS bucket */
public static final String GCS_DELETE_BATCH_SIZE = "gcs.delete.batch-size";
/**
* Max possible batch size for deletion. Currently, a max of 100 keys is advised, so we default to
* a number below that. https://cloud.google.com/storage/docs/batch
*/
public static final int GCS_DELETE_BATCH_SIZE_DEFAULT = 50;

private String projectId;
private String clientLibToken;
private String serviceHost;
Expand All @@ -54,7 +63,11 @@ public class GCPProperties implements Serializable {
private String gcsOAuth2Token;
private Date gcsOAuth2TokenExpiresAt;

public GCPProperties() {}
private int gcsDeleteBatchSize;

public GCPProperties() {
gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably ok to set this here, alternative maybe just in the line above? private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I updated this. I was just following the pattern in S3FileIO.

}

public GCPProperties(Map<String, String> properties) {
projectId = properties.get(GCS_PROJECT_ID);
Expand All @@ -78,6 +91,10 @@ public GCPProperties(Map<String, String> properties) {
gcsOAuth2TokenExpiresAt =
new Date(Long.parseLong(properties.get(GCS_OAUTH2_TOKEN_EXPIRES_AT)));
}

gcsDeleteBatchSize =
PropertyUtil.propertyAsInt(
properties, GCS_DELETE_BATCH_SIZE, GCS_DELETE_BATCH_SIZE_DEFAULT);
}

public Optional<Integer> channelReadChunkSize() {
Expand Down Expand Up @@ -119,4 +136,8 @@ public Optional<String> oauth2Token() {
public Optional<Date> oauth2TokenExpiresAt() {
return Optional.ofNullable(gcsOAuth2TokenExpiresAt);
}

public int deleteBatchSize() {
return gcsDeleteBatchSize;
}
}
51 changes: 50 additions & 1 deletion gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,24 @@

import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.gcp.GCPProperties;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
Expand All @@ -48,7 +55,7 @@
* <p>See <a href="https://cloud.google.com/storage/docs/folders#overview">Cloud Storage
* Overview</a>
*/
public class GCSFileIO implements FileIO {
public class GCSFileIO implements FileIO, SupportsBulkOperations, SupportsPrefixOperations {
private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class);
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";
Expand Down Expand Up @@ -174,4 +181,46 @@ public void close() {
}
}
}

@Override
public Iterable<FileInfo> listPrefix(String prefix) {
GCSLocation location = new GCSLocation(prefix);
return () ->
client()
.list(location.bucket(), Storage.BlobListOption.prefix(location.prefix()))
.streamAll()
.map(
blob ->
new FileInfo(
String.format("gs://%s/%s", blob.getBucket(), blob.getName()),
blob.getSize(),
getCreateTimeMillis(blob)))
.iterator();
}

private long getCreateTimeMillis(Blob blob) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Iceberg typically doesn't use getXyz methods. Maybe `createTimeMillisFrom(Blob)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I changed this to createTimeMillisFrom

if (blob.getCreateTimeOffsetDateTime() == null) {
return 0;
}
return blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli();
}

@Override
public void deletePrefix(String prefix) {
internalDeleteFiles(
() ->
Streams.stream(listPrefix(prefix))
.map(fileInfo -> BlobId.fromGsUtilUri(fileInfo.location()))
.iterator());
}

@Override
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
internalDeleteFiles(() -> Streams.stream(pathsToDelete).map(BlobId::fromGsUtilUri).iterator());
}

private void internalDeleteFiles(Iterable<BlobId> blobIdsToDelete) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering whether we should pass the Stream<BlobId through to this method to have something like

  @Override
  public void deletePrefix(String prefix) {
    internalDeleteFiles(
        Streams.stream(listPrefix(prefix))
            .map(fileInfo -> BlobId.fromGsUtilUri(fileInfo.location())));
  }

  @Override
  public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
    internalDeleteFiles(Streams.stream(pathsToDelete).map(BlobId::fromGsUtilUri));
  }

  private void internalDeleteFiles(Stream<BlobId> blobIdsToDelete) {
    Streams.stream(Iterators.partition(blobIdsToDelete.iterator(), gcpProperties.deleteBatchSize()))
        .forEach(batch -> client().delete(batch));
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is an improvement, I made this change

Streams.stream(Iterables.partition(blobIdsToDelete, gcpProperties.deleteBatchSize()))
.forEach(batch -> client().delete(batch));
}
}
76 changes: 76 additions & 0 deletions gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSLocation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.gcp.gcs;

import com.google.cloud.storage.BlobId;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* This class represents a fully qualified location in GCS expressed as a URI. This class allows for
* URIs with only a bucket and no path specified, unlike with {@link BlobId#fromGsUtilUri(String)}.
*/
class GCSLocation {
private static final String SCHEME_DELIM = "://";
private static final String PATH_DELIM = "/";
private static final String QUERY_DELIM = "\\?";
private static final String FRAGMENT_DELIM = "#";

private static final String EXPECTED_SCHEME = "gs";

private final String bucket;
private final String prefix;

/**
* Creates a new GCSLocation with the form of scheme://bucket/path?query#fragment
*
* @param location fully qualified URI
*/
GCSLocation(String location) {
Preconditions.checkNotNull(location, "Location cannot be null.");
bryanck marked this conversation as resolved.
Show resolved Hide resolved

String[] schemeSplit = location.split(SCHEME_DELIM, -1);
ValidationException.check(
schemeSplit.length == 2, "Invalid GCS URI, cannot determine scheme: %s", location);

String scheme = schemeSplit[0];
ValidationException.check(
EXPECTED_SCHEME.equals(scheme), "Invalid GCS URI, invalid scheme: %s", scheme);

String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);

this.bucket = authoritySplit[0];

// Strip query and fragment if they exist
String path = authoritySplit.length > 1 ? authoritySplit[1] : "";
path = path.split(QUERY_DELIM, -1)[0];
path = path.split(FRAGMENT_DELIM, -1)[0];
this.prefix = path;
}

/** Returns GCS bucket name. */
public String bucket() {
return bucket;
}

/** Returns GCS object name prefix. */
public String prefix() {
return prefix;
}
}
89 changes: 87 additions & 2 deletions gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@

import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

import com.google.api.client.util.Lists;
nastra marked this conversation as resolved.
Show resolved Hide resolved
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import java.util.stream.StreamSupport;
import org.apache.iceberg.TestHelpers;
Expand All @@ -36,6 +42,7 @@
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -44,11 +51,25 @@ public class GCSFileIOTest {
private static final String TEST_BUCKET = "TEST_BUCKET";
private final Random random = new Random(1);

private final Storage storage = LocalStorageHelper.getOptions().getService();
private final Storage storage = spy(LocalStorageHelper.getOptions().getService());
private GCSFileIO io;

@BeforeEach
public void before() {
// LocalStorageHelper doesn't support batch operations, so mock that here
doAnswer(
invoke -> {
Iterable<BlobId> iter = invoke.getArgument(0);
List<Boolean> answer = Lists.newArrayList();
iter.forEach(
blobId -> {
answer.add(storage.delete(blobId));
});
return answer;
})
.when(storage)
.delete(any(Iterable.class));

io = new GCSFileIO(() -> storage, new GCPProperties());
}

Expand Down Expand Up @@ -91,7 +112,7 @@ public void testDelete() {
.count())
.isEqualTo(1);

io.deleteFile(format("gs://%s/%s", TEST_BUCKET, path));
io.deleteFile(gsUri(path));

// The bucket should now be empty
assertThat(
Expand All @@ -100,6 +121,70 @@ public void testDelete() {
.isZero();
}

private String gsUri(String path) {
return format("gs://%s/%s", TEST_BUCKET, path);
}

@Test
public void testListPrefix() {
String prefix = "list/path/";
String path1 = prefix + "data1.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build());
String path2 = prefix + "data2.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build());
String path3 = "list/skip/data3.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build());

assertThat(StreamSupport.stream(io.listPrefix(gsUri("list/")).spliterator(), false).count())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I missed this initially, but there's no need to wrap this in a stream. This (and other places) can be simplified to assertThat(io.listPrefix(gsUri("list/"))).hasSize(3);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed there are also existing tests in that file that do the same thing, could you please update those as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent was to avoid using the FileIO we're testing to do the assertion and use the Google client directly, I assume the same for the existing tests.

.isEqualTo(3);

assertThat(StreamSupport.stream(io.listPrefix(gsUri(prefix)).spliterator(), false).count())
.isEqualTo(2);

assertThat(StreamSupport.stream(io.listPrefix(gsUri(path1)).spliterator(), false).count())
.isEqualTo(1);
}

@Test
public void testDeleteFiles() {
String prefix = "del/path/";
String path1 = prefix + "data1.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build());
String path2 = prefix + "data2.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build());
String path3 = "del/skip/data3.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build());

assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count())
.isEqualTo(3);

Iterable<String> deletes =
() -> ImmutableList.of(gsUri(path1), gsUri(path3)).stream().iterator();
io.deleteFiles(deletes);

assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count())
.isEqualTo(1);
}

@Test
public void testDeletePrefix() {
String prefix = "del/path/";
String path1 = prefix + "data1.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build());
String path2 = prefix + "data2.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build());
String path3 = "del/skip/data3.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build());

assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count())
.isEqualTo(3);

io.deletePrefix(gsUri(prefix));

assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count())
.isEqualTo(1);
}

@Test
public void testGCSFileIOKryoSerialization() throws IOException {
FileIO testGCSFileIO = new GCSFileIO();
Expand Down
Loading








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/apache/iceberg/pull/8168/files/0c42c4e65b21f7cc4085acbd7975e0c7f9edd081

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy