package com.google.cloud.dataflow.sdk.io;

import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
import com.google.api.services.datastore.DatastoreV1;
import com.google.api.services.datastore.client.Datastore;
import com.google.api.services.datastore.client.DatastoreException;
import com.google.api.services.datastore.client.DatastoreFactory;
import com.google.api.services.datastore.client.DatastoreHelper;
import com.google.api.services.datastore.client.DatastoreOptions;
import com.google.api.services.datastore.client.QuerySplitter;
import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.EntityCoder;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.Sink;
import com.google.cloud.dataflow.sdk.io.Write;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
import com.google.cloud.dataflow.sdk.options.GcpOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.MoreObjects;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Verify;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.primitives.Ints;
import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
import com.google.cloud.dataflow.sdk.util.PropertyNames;
import com.google.cloud.dataflow.sdk.util.RetryHttpRequestInitializer;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:com/google/cloud/dataflow/sdk/io/DatastoreIO.class */
public class DatastoreIO {
    public static final String DEFAULT_HOST = "https://www.googleapis.com";
    public static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/DatastoreIO$DatastoreReader.class */
    public static class DatastoreReader extends BoundedSource.BoundedReader<DatastoreV1.Entity> {
        private final Source source;
        private final Datastore datastore;
        private boolean moreResults;
        private Iterator<DatastoreV1.EntityResult> entities;
        private DatastoreV1.QueryResultBatch currentBatch;
        private static final int QUERY_BATCH_LIMIT = 500;
        private int userLimit;
        private DatastoreV1.Entity currentEntity;

        public DatastoreReader(Source source, Datastore datastore) {
            this.source = source;
            this.datastore = datastore;
            this.userLimit = source.query.hasLimit() ? source.query.getLimit() : Integer.MAX_VALUE;
        }

        @Override // com.google.cloud.dataflow.sdk.io.Source.Reader
        public DatastoreV1.Entity getCurrent() {
            return this.currentEntity;
        }

        @Override // com.google.cloud.dataflow.sdk.io.Source.Reader
        public boolean start() throws IOException {
            return advance();
        }

        @Override // com.google.cloud.dataflow.sdk.io.Source.Reader
        public boolean advance() throws IOException {
            if (this.entities == null || (!this.entities.hasNext() && this.moreResults)) {
                try {
                    this.entities = getIteratorAndMoveCursor();
                } catch (DatastoreException e) {
                    throw new IOException((Throwable) e);
                }
            }
            if (this.entities == null || !this.entities.hasNext()) {
                this.currentEntity = null;
                return false;
            }
            this.currentEntity = this.entities.next().getEntity();
            return true;
        }

        @Override // com.google.cloud.dataflow.sdk.io.Source.Reader, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        @Override // com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader, com.google.cloud.dataflow.sdk.io.Source.Reader
        public Source getCurrentSource() {
            return this.source;
        }

        @Override // com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader
        /* renamed from: splitAtFraction, reason: merged with bridge method [inline-methods] */
        public BoundedSource<DatastoreV1.Entity> splitAtFraction2(double d) {
            return null;
        }

        @Override // com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader
        public Double getFractionConsumed() {
            return null;
        }

        private Iterator<DatastoreV1.EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
            DatastoreV1.Query.Builder clone = this.source.query.toBuilder().clone();
            clone.setLimit(Math.min(this.userLimit, 500));
            if (this.currentBatch != null && this.currentBatch.hasEndCursor()) {
                clone.setStartCursor(this.currentBatch.getEndCursor());
            }
            this.currentBatch = this.datastore.runQuery(this.source.makeRequest(clone.build())).getBatch();
            int entityResultCount = this.currentBatch.getEntityResultCount();
            if (this.source.query.hasLimit()) {
                Verify.verify(this.userLimit >= entityResultCount, "Expected userLimit %s >= numFetch %s, because query limit %s should be <= userLimit", Integer.valueOf(this.userLimit), Integer.valueOf(entityResultCount), Integer.valueOf(clone.getLimit()));
                this.userLimit -= entityResultCount;
            }
            this.moreResults = this.userLimit > 0 && (entityResultCount == 500 || this.currentBatch.getMoreResults() == DatastoreV1.QueryResultBatch.MoreResultsType.NOT_FINISHED);
            if (entityResultCount == 0) {
                return null;
            }
            return this.currentBatch.getEntityResultList().iterator();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/DatastoreIO$DatastoreWriteOperation.class */
    public static class DatastoreWriteOperation extends Sink.WriteOperation<DatastoreV1.Entity, DatastoreWriteResult> {
        private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriteOperation.class);
        private final Sink sink;

        public DatastoreWriteOperation(Sink sink) {
            this.sink = sink;
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink.WriteOperation
        public Coder<DatastoreWriteResult> getWriterResultCoder() {
            return SerializableCoder.of(DatastoreWriteResult.class);
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink.WriteOperation
        public void initialize(PipelineOptions pipelineOptions) throws Exception {
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink.WriteOperation
        public void finalize(Iterable<DatastoreWriteResult> iterable, PipelineOptions pipelineOptions) throws Exception {
            long j = 0;
            Iterator<DatastoreWriteResult> it = iterable.iterator();
            while (it.hasNext()) {
                j += it.next().entitiesWritten;
            }
            LOG.info("Wrote {} elements.", Long.valueOf(j));
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink.WriteOperation
        /* renamed from: createWriter, reason: merged with bridge method [inline-methods] */
        public Sink.Writer<DatastoreV1.Entity, DatastoreWriteResult> createWriter2(PipelineOptions pipelineOptions) throws Exception {
            DatastoreOptions.Builder initializer = new DatastoreOptions.Builder().host(this.sink.host).dataset(this.sink.datasetId).initializer(new RetryHttpRequestInitializer());
            Credential gcpCredential = ((GcpOptions) pipelineOptions.as(GcpOptions.class)).getGcpCredential();
            if (gcpCredential != null) {
                initializer.credential(gcpCredential);
            }
            return new DatastoreWriter(this, DatastoreFactory.get().create(initializer.build()));
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink.WriteOperation
        /* renamed from: getSink, reason: merged with bridge method [inline-methods] */
        public com.google.cloud.dataflow.sdk.io.Sink<DatastoreV1.Entity> getSink2() {
            return this.sink;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/DatastoreIO$DatastoreWriteResult.class */
    public static class DatastoreWriteResult implements Serializable {
        final long entitiesWritten;

        public DatastoreWriteResult(long j) {
            this.entitiesWritten = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/DatastoreIO$DatastoreWriter.class */
    public static class DatastoreWriter extends Sink.Writer<DatastoreV1.Entity, DatastoreWriteResult> {
        private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriter.class);
        private final DatastoreWriteOperation writeOp;
        private final Datastore datastore;
        private long totalWritten = 0;
        final List<DatastoreV1.Entity> entities = new ArrayList();
        private static final int MAX_RETRIES = 5;
        private static final int INITIAL_BACKOFF_MILLIS = 5000;

        static boolean isValidKey(DatastoreV1.Key key) {
            List pathElementList = key.getPathElementList();
            if (pathElementList.isEmpty()) {
                return false;
            }
            DatastoreV1.Key.PathElement pathElement = (DatastoreV1.Key.PathElement) pathElementList.get(pathElementList.size() - 1);
            return pathElement.hasId() || pathElement.hasName();
        }

        DatastoreWriter(DatastoreWriteOperation datastoreWriteOperation, Datastore datastore) {
            this.writeOp = datastoreWriteOperation;
            this.datastore = datastore;
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink.Writer
        public void open(String str) throws Exception {
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink.Writer
        public void write(DatastoreV1.Entity entity) throws Exception {
            if (!isValidKey(entity.getKey())) {
                throw new IllegalArgumentException("Entities to be written to the Datastore must have complete keys");
            }
            this.entities.add(entity);
            if (this.entities.size() >= 500) {
                flushBatch();
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.google.cloud.dataflow.sdk.io.Sink.Writer
        public DatastoreWriteResult close() throws Exception {
            if (this.entities.size() > 0) {
                flushBatch();
            }
            return new DatastoreWriteResult(this.totalWritten);
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink.Writer
        /* renamed from: getWriteOperation, reason: merged with bridge method [inline-methods] */
        public Sink.WriteOperation<DatastoreV1.Entity, DatastoreWriteResult> getWriteOperation2() {
            return this.writeOp;
        }

        private void flushBatch() throws DatastoreException, IOException, InterruptedException {
            LOG.debug("Writing batch of {} entities", Integer.valueOf(this.entities.size()));
            Sleeper sleeper = Sleeper.DEFAULT;
            AttemptBoundedExponentialBackOff attemptBoundedExponentialBackOff = new AttemptBoundedExponentialBackOff(5, 5000L);
            do {
                try {
                    DatastoreV1.CommitRequest.Builder newBuilder = DatastoreV1.CommitRequest.newBuilder();
                    newBuilder.getMutationBuilder().addAllUpsert(this.entities);
                    newBuilder.setMode(DatastoreV1.CommitRequest.Mode.NON_TRANSACTIONAL);
                    this.datastore.commit(newBuilder.build());
                    this.totalWritten += this.entities.size();
                    LOG.debug("Successfully wrote {} entities", Integer.valueOf(this.entities.size()));
                    this.entities.clear();
                    return;
                } catch (DatastoreException e) {
                    LOG.error("Error writing to the Datastore ({}): {}", Integer.valueOf(e.getCode()), e.getMessage());
                }
            } while (BackOffUtils.next(sleeper, attemptBoundedExponentialBackOff));
            LOG.error("Aborting after {} retries.", 5);
            throw e;
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/DatastoreIO$Sink.class */
    public static class Sink extends com.google.cloud.dataflow.sdk.io.Sink<DatastoreV1.Entity> {
        final String host;
        final String datasetId;

        public Sink withDataset(String str) {
            Preconditions.checkNotNull(str, "datasetId");
            return new Sink(this.host, str);
        }

        public Sink withHost(String str) {
            Preconditions.checkNotNull(str, "host");
            return new Sink(str, this.datasetId);
        }

        protected Sink(String str, String str2) {
            this.host = (String) Preconditions.checkNotNull(str, "host");
            this.datasetId = str2;
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink
        public void validate(PipelineOptions pipelineOptions) {
            Preconditions.checkNotNull(this.host, "Host is a required parameter. Please use withHost to set the host.");
            Preconditions.checkNotNull(this.datasetId, "Dataset ID is a required parameter. Please use withDataset to to set the datasetId.");
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink
        /* renamed from: createWriteOperation, reason: merged with bridge method [inline-methods] */
        public Sink.WriteOperation<DatastoreV1.Entity, ?> createWriteOperation2(PipelineOptions pipelineOptions) {
            return new DatastoreWriteOperation(this);
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/DatastoreIO$Source.class */
    public static class Source extends BoundedSource<DatastoreV1.Entity> {
        private static final Logger LOG = LoggerFactory.getLogger(Source.class);
        private final String host;

        @Nullable
        private final String datasetId;

        @Nullable
        private final DatastoreV1.Query query;

        @Nullable
        private final String namespace;

        @Nullable
        private QuerySplitter mockSplitter;

        @Nullable
        private Long mockEstimateSizeBytes;

        public String getHost() {
            return this.host;
        }

        public String getDataset() {
            return this.datasetId;
        }

        public DatastoreV1.Query getQuery() {
            return this.query;
        }

        @Nullable
        public String getNamespace() {
            return this.namespace;
        }

        public Source withDataset(String str) {
            Preconditions.checkNotNull(str, "datasetId");
            return new Source(this.host, str, this.query, this.namespace);
        }

        public Source withQuery(DatastoreV1.Query query) {
            Preconditions.checkNotNull(query, "query");
            Preconditions.checkArgument(!query.hasLimit() || query.getLimit() > 0, "Invalid query limit %s: must be positive", Integer.valueOf(query.getLimit()));
            return new Source(this.host, this.datasetId, query, this.namespace);
        }

        public Source withHost(String str) {
            Preconditions.checkNotNull(str, "host");
            return new Source(str, this.datasetId, this.query, this.namespace);
        }

        public Source withNamespace(@Nullable String str) {
            return new Source(this.host, this.datasetId, this.query, str);
        }

        @Override // com.google.cloud.dataflow.sdk.io.Source
        public Coder<DatastoreV1.Entity> getDefaultOutputCoder() {
            return EntityCoder.of();
        }

        @Override // com.google.cloud.dataflow.sdk.io.BoundedSource
        public boolean producesSortedKeys(PipelineOptions pipelineOptions) {
            return false;
        }

        @Override // com.google.cloud.dataflow.sdk.io.BoundedSource
        public List<? extends BoundedSource<DatastoreV1.Entity>> splitIntoBundles(long j, PipelineOptions pipelineOptions) throws Exception {
            long j2;
            if (this.query.hasLimit()) {
                return ImmutableList.of(this);
            }
            try {
                j2 = Math.round(getEstimatedSizeBytes(pipelineOptions) / j);
            } catch (Exception e) {
                DataflowPipelineWorkerPoolOptions dataflowPipelineWorkerPoolOptions = (DataflowPipelineWorkerPoolOptions) pipelineOptions.as(DataflowPipelineWorkerPoolOptions.class);
                if (dataflowPipelineWorkerPoolOptions.getNumWorkers() > 0) {
                    LOG.warn("Estimated size of unavailable, using the number of workers {}", Integer.valueOf(dataflowPipelineWorkerPoolOptions.getNumWorkers()), e);
                    j2 = dataflowPipelineWorkerPoolOptions.getNumWorkers();
                } else {
                    j2 = 12;
                }
            }
            if (j2 <= 1) {
                return ImmutableList.of(this);
            }
            ImmutableList.Builder builder = ImmutableList.builder();
            Iterator<DatastoreV1.Query> it = getSplitQueries(Ints.checkedCast(j2), pipelineOptions).iterator();
            while (it.hasNext()) {
                builder.add((ImmutableList.Builder) new Source(this.host, this.datasetId, it.next(), this.namespace));
            }
            return builder.build();
        }

        @Override // com.google.cloud.dataflow.sdk.io.BoundedSource
        public BoundedSource.BoundedReader<DatastoreV1.Entity> createReader(PipelineOptions pipelineOptions) throws IOException {
            return new DatastoreReader(this, getDatastore(pipelineOptions));
        }

        @Override // com.google.cloud.dataflow.sdk.io.Source
        public void validate() {
            Preconditions.checkNotNull(this.host, "host");
            Preconditions.checkNotNull(this.query, "query");
            Preconditions.checkNotNull(this.datasetId, "datasetId");
        }

        @Override // com.google.cloud.dataflow.sdk.io.BoundedSource
        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
            if (this.mockEstimateSizeBytes != null) {
                return this.mockEstimateSizeBytes.longValue();
            }
            Datastore datastore = getDatastore(pipelineOptions);
            if (this.query.getKindCount() != 1) {
                throw new UnsupportedOperationException("Can only estimate size for queries specifying exactly 1 kind.");
            }
            String name = this.query.getKind(0).getName();
            long queryLatestStatisticsTimestamp = queryLatestStatisticsTimestamp(datastore);
            DatastoreV1.Query.Builder newBuilder = DatastoreV1.Query.newBuilder();
            if (this.namespace == null) {
                newBuilder.addKindBuilder().setName("__Stat_Kind__");
            } else {
                newBuilder.addKindBuilder().setName("__Ns_Stat_Kind__");
            }
            newBuilder.setFilter(DatastoreHelper.makeFilter(new DatastoreV1.Filter[]{DatastoreHelper.makeFilter("kind_name", DatastoreV1.PropertyFilter.Operator.EQUAL, DatastoreHelper.makeValue(name)).build(), DatastoreHelper.makeFilter("timestamp", DatastoreV1.PropertyFilter.Operator.EQUAL, DatastoreHelper.makeValue(queryLatestStatisticsTimestamp)).build()}));
            DatastoreV1.RunQueryRequest makeRequest = makeRequest(newBuilder.build());
            long currentTimeMillis = System.currentTimeMillis();
            DatastoreV1.RunQueryResponse runQuery = datastore.runQuery(makeRequest);
            LOG.info("Query for per-kind statistics took {}ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            DatastoreV1.QueryResultBatch batch = runQuery.getBatch();
            if (batch.getEntityResultCount() == 0) {
                throw new NoSuchElementException(new StringBuilder(42 + String.valueOf(name).length()).append("Datastore statistics for kind ").append(name).append(" unavailable").toString());
            }
            return ((DatastoreV1.Value) DatastoreHelper.getPropertyMap(batch.getEntityResult(0).getEntity()).get("entity_bytes")).getIntegerValue();
        }

        public String toString() {
            return MoreObjects.toStringHelper(getClass()).add("host", this.host).add(PropertyNames.BIGQUERY_DATASET, this.datasetId).add("query", this.query).add("namespace", this.namespace).toString();
        }

        private Source(String str, @Nullable String str2, @Nullable DatastoreV1.Query query, @Nullable String str3) {
            this.host = (String) Preconditions.checkNotNull(str, "host");
            this.datasetId = str2;
            this.query = query;
            this.namespace = str3;
        }

        private List<DatastoreV1.Query> getSplitQueries(int i, PipelineOptions pipelineOptions) throws DatastoreException {
            DatastoreV1.PartitionId.Builder newBuilder = DatastoreV1.PartitionId.newBuilder();
            if (this.namespace != null) {
                newBuilder.setNamespace(this.namespace);
            }
            return this.mockSplitter != null ? this.mockSplitter.getSplits(this.query, newBuilder.build(), i, (Datastore) null) : DatastoreHelper.getQuerySplitter().getSplits(this.query, newBuilder.build(), i, getDatastore(pipelineOptions));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public DatastoreV1.RunQueryRequest makeRequest(DatastoreV1.Query query) {
            DatastoreV1.RunQueryRequest.Builder query2 = DatastoreV1.RunQueryRequest.newBuilder().setQuery(query);
            if (this.namespace != null) {
                query2.getPartitionIdBuilder().setNamespace(this.namespace);
            }
            return query2.build();
        }

        private long queryLatestStatisticsTimestamp(Datastore datastore) throws DatastoreException {
            DatastoreV1.Query.Builder newBuilder = DatastoreV1.Query.newBuilder();
            newBuilder.addKindBuilder().setName("__Stat_Total__");
            newBuilder.addOrder(DatastoreHelper.makeOrder("timestamp", DatastoreV1.PropertyOrder.Direction.DESCENDING));
            newBuilder.setLimit(1);
            DatastoreV1.RunQueryRequest makeRequest = makeRequest(newBuilder.build());
            long currentTimeMillis = System.currentTimeMillis();
            DatastoreV1.RunQueryResponse runQuery = datastore.runQuery(makeRequest);
            LOG.info("Query for latest stats timestamp of dataset {} took {}ms", this.datasetId, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            DatastoreV1.QueryResultBatch batch = runQuery.getBatch();
            if (batch.getEntityResultCount() != 0) {
                return ((DatastoreV1.Value) DatastoreHelper.getPropertyMap(batch.getEntityResult(0).getEntity()).get("timestamp")).getTimestampMicrosecondsValue();
            }
            String str = this.datasetId;
            throw new NoSuchElementException(new StringBuilder(51 + String.valueOf(str).length()).append("Datastore total statistics for dataset ").append(str).append(" unavailable").toString());
        }

        private Datastore getDatastore(PipelineOptions pipelineOptions) {
            DatastoreOptions.Builder initializer = new DatastoreOptions.Builder().host(this.host).dataset(this.datasetId).initializer(new RetryHttpRequestInitializer());
            Credential gcpCredential = ((GcpOptions) pipelineOptions.as(GcpOptions.class)).getGcpCredential();
            if (gcpCredential != null) {
                initializer.credential(gcpCredential);
            }
            return DatastoreFactory.get().create(initializer.build());
        }

        Source withMockSplitter(QuerySplitter querySplitter) {
            Source source = new Source(this.host, this.datasetId, this.query, this.namespace);
            source.mockSplitter = querySplitter;
            source.mockEstimateSizeBytes = this.mockEstimateSizeBytes;
            return source;
        }

        Source withMockEstimateSizeBytes(Long l) {
            Source source = new Source(this.host, this.datasetId, this.query, this.namespace);
            source.mockSplitter = this.mockSplitter;
            source.mockEstimateSizeBytes = l;
            return source;
        }
    }

    @Deprecated
    public static Source read() {
        return source();
    }

    public static Source source() {
        return new Source(DEFAULT_HOST, null, null, null);
    }

    public static Read.Bounded<DatastoreV1.Entity> readFrom(String str, DatastoreV1.Query query) {
        return Read.from(new Source(DEFAULT_HOST, str, query, null));
    }

    @Deprecated
    public static Read.Bounded<DatastoreV1.Entity> readFrom(String str, String str2, DatastoreV1.Query query) {
        return Read.from(new Source(str, str2, query, null));
    }

    public static Sink sink() {
        return new Sink(DEFAULT_HOST, null);
    }

    public static Write.Bound<DatastoreV1.Entity> writeTo(String str) {
        return Write.to(sink().withDataset(str));
    }
}
