package org.apache.beam.sdk.io.gcp.firestore;

import com.google.cloud.firestore.v1.stub.FirestoreStub;
import com.google.firestore.v1.BatchWriteRequest;
import com.google.firestore.v1.BatchWriteResponse;
import com.google.firestore.v1.DatabaseRootName;
import com.google.firestore.v1.Write;
import com.google.firestore.v1.WriteResult;
import com.google.rpc.Code;
import com.google.rpc.Status;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1RpcAttemptContexts;
import org.apache.beam.sdk.io.gcp.firestore.RpcQos;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
import org.checkerframework.dataflow.qual.Pure;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.class */
final class FirestoreV1WriteFn {

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn$BaseBatchWriteFn.class */
    static abstract class BaseBatchWriteFn<OutT> extends FirestoreDoFn.ExplicitlyWindowedFirestoreDoFn<Write, OutT> implements FirestoreV1RpcAttemptContexts.HasRpcAttemptContext {
        private static final Logger LOG = LoggerFactory.getLogger(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite.getNamespace());
        private final JodaClock clock;
        private final FirestoreStatefulComponentFactory firestoreStatefulComponentFactory;
        private final RpcQosOptions rpcQosOptions;
        private final CounterFactory counterFactory;
        private transient RpcQos rpcQos;
        private transient Counter writesSuccessful;
        private transient Counter writesFailedRetryable;
        private transient Counter writesFailedNonRetryable;
        private transient FirestoreStub firestoreStub;
        private transient DatabaseRootName databaseRootName;

        @VisibleForTesting
        transient Queue<WriteElement> writes = new PriorityQueue(WriteElement.COMPARATOR);

        @VisibleForTesting
        transient int queueNextEntryPriority = 0;
        private final FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext rpcAttemptContext = FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn$BaseBatchWriteFn$ContextAdapter.class */
        public interface ContextAdapter<T> {
            void output(T t, Instant instant, BoundedWindow boundedWindow);
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn$BaseBatchWriteFn$DoFlushStatus.class */
        public enum DoFlushStatus {
            OK,
            ONE_OR_MORE_FAILURES
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn$BaseBatchWriteFn$FinishBundleContextAdapter.class */
        private static final class FinishBundleContextAdapter<T> implements ContextAdapter<T> {
            private final DoFn<Write, T>.FinishBundleContext context;

            private FinishBundleContextAdapter(DoFn<Write, T>.FinishBundleContext finishBundleContext) {
                this.context = finishBundleContext;
            }

            @Override // org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BaseBatchWriteFn.ContextAdapter
            public void output(T t, Instant instant, BoundedWindow boundedWindow) {
                this.context.output(t, instant, boundedWindow);
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn$BaseBatchWriteFn$ProcessContextAdapter.class */
        private static final class ProcessContextAdapter<T> implements ContextAdapter<T> {
            private final DoFn<Write, T>.ProcessContext context;

            private ProcessContextAdapter(DoFn<Write, T>.ProcessContext processContext) {
                this.context = processContext;
            }

            @Override // org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BaseBatchWriteFn.ContextAdapter
            public void output(T t, Instant instant, BoundedWindow boundedWindow) {
                this.context.outputWithTimestamp(t, instant);
            }
        }

        BaseBatchWriteFn(JodaClock jodaClock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, CounterFactory counterFactory) {
            this.clock = jodaClock;
            this.firestoreStatefulComponentFactory = firestoreStatefulComponentFactory;
            this.rpcQosOptions = rpcQosOptions;
            this.counterFactory = counterFactory;
        }

        @Override // org.apache.beam.sdk.io.gcp.firestore.FirestoreV1RpcAttemptContexts.HasRpcAttemptContext
        public RpcQos.RpcAttempt.Context getRpcAttemptContext() {
            return this.rpcAttemptContext;
        }

        @Override // org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn
        public final void populateDisplayData(DisplayData.Builder builder) {
            builder.include("rpcQosOptions", this.rpcQosOptions);
        }

        @Override // org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn
        public void setup() {
            this.rpcQos = this.firestoreStatefulComponentFactory.getRpcQos(this.rpcQosOptions);
            this.writes = new PriorityQueue(WriteElement.COMPARATOR);
            String namespace = this.rpcAttemptContext.getNamespace();
            this.writesSuccessful = this.counterFactory.get(namespace, "writes_successful");
            this.writesFailedRetryable = this.counterFactory.get(namespace, "writes_failed_retryable");
            this.writesFailedNonRetryable = this.counterFactory.get(namespace, "writes_failed_non-retryable");
        }

        @Override // org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn
        public final void startBundle(DoFn<Write, OutT>.StartBundleContext startBundleContext) {
            this.databaseRootName = DatabaseRootName.of((String) Objects.requireNonNull(startBundleContext.getPipelineOptions().as(GcpOptions.class).getProject(), "project must be defined on GcpOptions of PipelineOptions"), (String) Objects.requireNonNull(((FirestoreOptions) startBundleContext.getPipelineOptions().as(FirestoreOptions.class)).getFirestoreDb(), "firestoreDb must be defined on FirestoreOptions of PipelineOptions"));
            this.firestoreStub = this.firestoreStatefulComponentFactory.getFirestoreStub(startBundleContext.getPipelineOptions());
        }

        @Override // org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.ExplicitlyWindowedFirestoreDoFn
        public void processElement(DoFn<Write, OutT>.ProcessContext processContext, BoundedWindow boundedWindow) throws Exception {
            Write write = (Write) Objects.requireNonNull((Write) processContext.element(), "context.element() must be non null");
            ProcessContextAdapter processContextAdapter = new ProcessContextAdapter(processContext);
            if (this.rpcQos.bytesOverLimit(write.getSerializedSize())) {
                String format = String.format("%s for document '%s' larger than configured max allowed bytes per batch", getWriteType(write), getName(write));
                handleWriteFailures(processContextAdapter, this.clock.instant(), ImmutableList.of(KV.of(new FirestoreV1.WriteFailure(write, WriteResult.newBuilder().build(), Status.newBuilder().setCode(Code.INVALID_ARGUMENT.getNumber()).setMessage(format).build()), boundedWindow)), () -> {
                    LOG.info(format);
                });
                return;
            }
            Queue<WriteElement> queue = this.writes;
            int i = this.queueNextEntryPriority;
            this.queueNextEntryPriority = i + 1;
            queue.offer(new WriteElement(i, write, boundedWindow));
            flushBatch(false, processContextAdapter);
        }

        @Override // org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.ExplicitlyWindowedFirestoreDoFn
        public void finishBundle(DoFn<Write, OutT>.FinishBundleContext finishBundleContext) throws Exception {
            try {
                flushBatch(true, new FinishBundleContextAdapter(finishBundleContext));
            } finally {
                this.databaseRootName = null;
                this.firestoreStub.close();
            }
        }

        private void flushBatch(boolean z, ContextAdapter<OutT> contextAdapter) throws InterruptedException {
            while (!this.writes.isEmpty()) {
                RpcQos.RpcWriteAttempt newWriteAttempt = this.rpcQos.newWriteAttempt(getRpcAttemptContext());
                Instant instant = this.clock.instant();
                if (newWriteAttempt.awaitSafeToProceed(instant)) {
                    RpcQos.RpcWriteAttempt.FlushBuffer<WriteElement> flushBuffer = getFlushBuffer(newWriteAttempt, instant);
                    if (!flushBuffer.isFull() && (!z || !flushBuffer.isNonEmpty())) {
                        Queue<WriteElement> queue = this.writes;
                        Objects.requireNonNull(queue);
                        flushBuffer.forEach((v1) -> {
                            r1.offer(v1);
                        });
                        if (!z) {
                            return;
                        }
                    } else if (doFlush(newWriteAttempt, flushBuffer, contextAdapter) == DoFlushStatus.ONE_OR_MORE_FAILURES && !z) {
                        break;
                    }
                }
            }
            if (this.writes.isEmpty()) {
                this.queueNextEntryPriority = 0;
            }
        }

        private RpcQos.RpcWriteAttempt.FlushBuffer<WriteElement> getFlushBuffer(RpcQos.RpcWriteAttempt rpcWriteAttempt, Instant instant) {
            RpcQos.RpcWriteAttempt.FlushBuffer<WriteElement> newFlushBuffer = rpcWriteAttempt.newFlushBuffer(instant);
            while (true) {
                WriteElement peek = this.writes.peek();
                if (peek == null || !newFlushBuffer.offer(peek)) {
                    break;
                }
                this.writes.poll();
            }
            return newFlushBuffer;
        }

        private BatchWriteRequest getBatchWriteRequest(RpcQos.RpcWriteAttempt.FlushBuffer<WriteElement> flushBuffer) {
            BatchWriteRequest.Builder database = BatchWriteRequest.newBuilder().setDatabase(this.databaseRootName.toString());
            Iterator<ElementT> it = flushBuffer.iterator();
            while (it.hasNext()) {
                database.addWrites(((WriteElement) it.next()).getValue());
            }
            return database.build();
        }

        private DoFlushStatus doFlush(RpcQos.RpcWriteAttempt rpcWriteAttempt, RpcQos.RpcWriteAttempt.FlushBuffer<WriteElement> flushBuffer, ContextAdapter<OutT> contextAdapter) throws InterruptedException {
            Instant instant;
            BatchWriteResponse batchWriteResponse;
            Instant instant2;
            int bufferedElementsCount = flushBuffer.getBufferedElementsCount();
            long bufferedElementsBytes = flushBuffer.getBufferedElementsBytes();
            BatchWriteRequest batchWriteRequest = getBatchWriteRequest(flushBuffer);
            while (true) {
                instant = this.clock.instant();
                LOG.debug("Sending BatchWrite request with {} writes totalling {} bytes", Integer.valueOf(bufferedElementsCount), Long.valueOf(bufferedElementsBytes));
                try {
                    rpcWriteAttempt.recordRequestStart(instant, bufferedElementsCount);
                    batchWriteResponse = (BatchWriteResponse) this.firestoreStub.batchWriteCallable().call(batchWriteRequest);
                    instant2 = this.clock.instant();
                    rpcWriteAttempt.recordRequestSuccessful(instant2);
                    break;
                } catch (RuntimeException e) {
                    Instant instant3 = this.clock.instant();
                    String message = e.getMessage();
                    Logger logger = LOG;
                    Object[] objArr = new Object[3];
                    objArr[0] = Integer.valueOf(bufferedElementsCount);
                    objArr[1] = Long.valueOf(bufferedElementsBytes);
                    objArr[2] = message != null ? message : e.getClass().getName();
                    logger.warn("Sending BatchWrite request with {} writes totalling {} bytes failed due to error: {}", objArr);
                    rpcWriteAttempt.recordRequestFailed(instant3);
                    rpcWriteAttempt.recordWriteCounts(instant3, 0, bufferedElementsCount);
                    Queue<WriteElement> queue = this.writes;
                    Objects.requireNonNull(queue);
                    flushBuffer.forEach((v1) -> {
                        r1.offer(v1);
                    });
                    rpcWriteAttempt.checkCanRetry(instant3, e);
                }
            }
            long millis = instant2.minus(Duration.millis(instant.getMillis())).getMillis();
            int i = 0;
            long j = 0;
            BoundedWindow boundedWindow = null;
            ArrayList arrayList = new ArrayList();
            List writeResultsList = batchWriteResponse.getWriteResultsList();
            List statusList = batchWriteResponse.getStatusList();
            Iterator<ElementT> it = flushBuffer.iterator();
            for (int i2 = 0; it.hasNext() && i2 < statusList.size(); i2++) {
                WriteElement writeElement = (WriteElement) it.next();
                Status status = (Status) statusList.get(i2);
                Code forNumber = Code.forNumber(status.getCode());
                if (forNumber == Code.OK) {
                    i++;
                    j += writeElement.getSerializedSize();
                    boundedWindow = writeElement.window;
                } else if (rpcWriteAttempt.isCodeRetryable(forNumber)) {
                    this.writes.offer(writeElement);
                } else {
                    arrayList.add(KV.of(new FirestoreV1.WriteFailure(writeElement.getValue(), (WriteResult) writeResultsList.get(i2), status), writeElement.window));
                }
            }
            int size = arrayList.size();
            int i3 = (bufferedElementsCount - i) - size;
            this.writesSuccessful.inc(i);
            this.writesFailedRetryable.inc(i3);
            this.writesFailedNonRetryable.inc(size);
            rpcWriteAttempt.recordWriteCounts(instant2, i, size + i3);
            if (i == bufferedElementsCount) {
                handleWriteSummary(contextAdapter, instant2, KV.of(new FirestoreV1.WriteSuccessSummary(i, j), coerceNonNull(boundedWindow)), () -> {
                    LOG.debug("Sending BatchWrite request with {} writes totalling {} bytes was completely applied in {}ms", new Object[]{Integer.valueOf(bufferedElementsCount), Long.valueOf(bufferedElementsBytes), Long.valueOf(millis)});
                });
                rpcWriteAttempt.completeSuccess();
                return DoFlushStatus.OK;
            }
            if (size > 0) {
                int i4 = i;
                handleWriteFailures(contextAdapter, instant2, ImmutableList.copyOf(arrayList), () -> {
                    LOG.warn("Sending BatchWrite request with {} writes totalling {} bytes was incompletely applied in {}ms ({} ok, {} retryable, {} non-retryable)", new Object[]{Integer.valueOf(bufferedElementsCount), Long.valueOf(bufferedElementsBytes), Long.valueOf(millis), Integer.valueOf(i4), Integer.valueOf(i3), Integer.valueOf(size)});
                });
            } else if (i3 > 0) {
                int i5 = i;
                Runnable runnable = () -> {
                    LOG.debug("Sending BatchWrite request with {} writes totalling {} bytes was incompletely applied in {}ms ({} ok, {} retryable)", new Object[]{Integer.valueOf(bufferedElementsCount), Long.valueOf(bufferedElementsBytes), Long.valueOf(millis), Integer.valueOf(i5), Integer.valueOf(i3)});
                };
                if (i > 0) {
                    handleWriteSummary(contextAdapter, instant2, KV.of(new FirestoreV1.WriteSuccessSummary(i, j), coerceNonNull(boundedWindow)), runnable);
                } else {
                    runnable.run();
                }
            }
            return DoFlushStatus.ONE_OR_MORE_FAILURES;
        }

        private static BoundedWindow coerceNonNull(BoundedWindow boundedWindow) {
            if (boundedWindow == null) {
                throw new IllegalStateException("Unable to locate window for successful request");
            }
            return boundedWindow;
        }

        abstract void handleWriteFailures(ContextAdapter<OutT> contextAdapter, Instant instant, List<KV<FirestoreV1.WriteFailure, BoundedWindow>> list, Runnable runnable);

        abstract void handleWriteSummary(ContextAdapter<OutT> contextAdapter, Instant instant, KV<FirestoreV1.WriteSuccessSummary, BoundedWindow> kv, Runnable runnable);

        private static String getWriteType(Write write) {
            return write.hasUpdate() ? "UPDATE" : write.hasTransform() ? "TRANSFORM" : "DELETE";
        }

        private static String getName(Write write) {
            return write.hasUpdate() ? write.getUpdate().getName() : write.hasTransform() ? write.getTransform().getDocument() : write.getDelete();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn$BatchWriteFnWithDeadLetterQueue.class */
    static final class BatchWriteFnWithDeadLetterQueue extends BaseBatchWriteFn<FirestoreV1.WriteFailure> {
        /* JADX INFO: Access modifiers changed from: package-private */
        public BatchWriteFnWithDeadLetterQueue(JodaClock jodaClock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, CounterFactory counterFactory) {
            super(jodaClock, firestoreStatefulComponentFactory, rpcQosOptions, counterFactory);
        }

        @Override // org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BaseBatchWriteFn
        void handleWriteFailures(BaseBatchWriteFn.ContextAdapter<FirestoreV1.WriteFailure> contextAdapter, Instant instant, List<KV<FirestoreV1.WriteFailure, BoundedWindow>> list, Runnable runnable) {
            runnable.run();
            for (KV<FirestoreV1.WriteFailure, BoundedWindow> kv : list) {
                contextAdapter.output((FirestoreV1.WriteFailure) kv.getKey(), instant, (BoundedWindow) kv.getValue());
            }
        }

        @Override // org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BaseBatchWriteFn
        void handleWriteSummary(BaseBatchWriteFn.ContextAdapter<FirestoreV1.WriteFailure> contextAdapter, Instant instant, KV<FirestoreV1.WriteSuccessSummary, BoundedWindow> kv, Runnable runnable) {
            runnable.run();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn$BatchWriteFnWithSummary.class */
    static final class BatchWriteFnWithSummary extends BaseBatchWriteFn<FirestoreV1.WriteSuccessSummary> {
        /* JADX INFO: Access modifiers changed from: package-private */
        public BatchWriteFnWithSummary(JodaClock jodaClock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, CounterFactory counterFactory) {
            super(jodaClock, firestoreStatefulComponentFactory, rpcQosOptions, counterFactory);
        }

        @Override // org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BaseBatchWriteFn
        void handleWriteFailures(BaseBatchWriteFn.ContextAdapter<FirestoreV1.WriteSuccessSummary> contextAdapter, Instant instant, List<KV<FirestoreV1.WriteFailure, BoundedWindow>> list, Runnable runnable) {
            throw new FirestoreV1.FailedWritesException((List) list.stream().map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toList()));
        }

        @Override // org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BaseBatchWriteFn
        void handleWriteSummary(BaseBatchWriteFn.ContextAdapter<FirestoreV1.WriteSuccessSummary> contextAdapter, Instant instant, KV<FirestoreV1.WriteSuccessSummary, BoundedWindow> kv, Runnable runnable) {
            runnable.run();
            contextAdapter.output((FirestoreV1.WriteSuccessSummary) kv.getKey(), instant, (BoundedWindow) kv.getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn$WriteElement.class */
    public static final class WriteElement implements RpcQos.RpcWriteAttempt.Element<Write> {
        private static final Comparator<WriteElement> COMPARATOR = Comparator.comparing((v0) -> {
            return v0.getQueuePosition();
        });
        private final int queuePosition;
        private final Write value;
        private final BoundedWindow window;

        WriteElement(int i, Write write, BoundedWindow boundedWindow) {
            this.value = write;
            this.queuePosition = i;
            this.window = boundedWindow;
        }

        public int getQueuePosition() {
            return this.queuePosition;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element
        public Write getValue() {
            return this.value;
        }

        @Override // org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element
        public long getSerializedSize() {
            return this.value.getSerializedSize();
        }

        @EnsuresNonNullIf(expression = {"#1"}, result = true)
        @Pure
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof WriteElement)) {
                return false;
            }
            WriteElement writeElement = (WriteElement) obj;
            return this.queuePosition == writeElement.queuePosition && this.value.equals(writeElement.value) && this.window.equals(writeElement.window);
        }

        @Pure
        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.queuePosition), this.value, this.window);
        }

        @SideEffectFree
        public String toString() {
            return "WriteElement{queuePosition=" + this.queuePosition + ", value=" + this.value + ", window=" + this.window + '}';
        }
    }

    FirestoreV1WriteFn() {
    }
}
