package org.apache.beam.sdk.io.gcp.firestore;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.firestore.v1.stub.FirestoreStub;
import com.google.firestore.v1.BatchWriteRequest;
import com.google.firestore.v1.BatchWriteResponse;
import com.google.firestore.v1.Write;
import com.google.firestore.v1.WriteResult;
import com.google.rpc.Code;
import com.google.rpc.Status;
import io.grpc.Status;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1RpcAttemptContexts;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1RpcAttemptContexts.HasRpcAttemptContext;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BaseBatchWriteFn;
import org.apache.beam.sdk.io.gcp.firestore.RpcQos;
import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.class */
public abstract class BaseFirestoreV1WriteFnTest<OutT, FnT extends FirestoreV1WriteFn.BaseBatchWriteFn<OutT> & FirestoreV1RpcAttemptContexts.HasRpcAttemptContext> extends BaseFirestoreV1FnTest<Write, OutT, FnT> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseFirestoreV1WriteFnTest.class);
    protected static final Status STATUS_OK = Status.newBuilder().setCode(Code.OK.getNumber()).build();
    protected static final Status STATUS_DEADLINE_EXCEEDED = Status.newBuilder().setCode(Code.DEADLINE_EXCEEDED.getNumber()).build();

    @Mock(lenient = true)
    protected BoundedWindow window;

    @Mock
    protected DoFn<Write, OutT>.FinishBundleContext finishBundleContext;

    @Mock
    protected UnaryCallable<BatchWriteRequest, BatchWriteResponse> callable;

    @Mock
    protected RpcQos.RpcWriteAttempt attempt;

    @Mock
    protected RpcQos.RpcWriteAttempt attempt2;
    protected MetricsFixture metricsFixture;
    private static final String METRIC_MARKER = "XXX";

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest$MetricsFixture.class */
    public static final class MetricsFixture {
        final Map<String, MyCounter> counters = new HashMap();
        final Map<String, MyDistribution> distributions = new HashMap();
        final CounterFactory counterFactory = (str, str2) -> {
            return this.counters.computeIfAbsent(str2, str -> {
                return new MyCounter(str, str2);
            });
        };
        final DistributionFactory distributionFactory = (str, str2) -> {
            return this.distributions.computeIfAbsent(str2, str -> {
                return new MyDistribution(str, str2);
            });
        };

        public Map<String, MyCounter> getCounters() {
            return ImmutableMap.copyOf(this.counters);
        }

        public Map<String, MyDistribution> getDistributions() {
            return ImmutableMap.copyOf(this.distributions);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -909363904:
                    if (implMethodName.equals("lambda$new$74560bbd$1")) {
                        z = true;
                        break;
                    }
                    break;
                case 2107289601:
                    if (implMethodName.equals("lambda$new$b45993cf$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/gcp/firestore/CounterFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/beam/sdk/metrics/Counter;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest$MetricsFixture") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/beam/sdk/metrics/Counter;")) {
                        MetricsFixture metricsFixture = (MetricsFixture) serializedLambda.getCapturedArg(0);
                        return (str, str2) -> {
                            return this.counters.computeIfAbsent(str2, str -> {
                                return new MyCounter(str, str2);
                            });
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/gcp/firestore/DistributionFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/beam/sdk/metrics/Distribution;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest$MetricsFixture") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/beam/sdk/metrics/Distribution;")) {
                        MetricsFixture metricsFixture2 = (MetricsFixture) serializedLambda.getCapturedArg(0);
                        return (str3, str22) -> {
                            return this.distributions.computeIfAbsent(str22, str3 -> {
                                return new MyDistribution(str3, str22);
                            });
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest$MyCounter.class */
    public static class MyCounter implements Counter {
        private final MetricName named;
        private final List<Long> incInvocations = new ArrayList();

        public MyCounter(String str, String str2) {
            this.named = MetricName.named(str, str2);
        }

        public void inc() {
            BaseFirestoreV1WriteFnTest.LOG.trace("{} {}:inc()", BaseFirestoreV1WriteFnTest.METRIC_MARKER, this.named);
        }

        public void inc(long j) {
            BaseFirestoreV1WriteFnTest.LOG.trace("{} {}:inc(n = {})", new Object[]{BaseFirestoreV1WriteFnTest.METRIC_MARKER, this.named, Long.valueOf(j)});
            this.incInvocations.add(Long.valueOf(j));
        }

        public void dec() {
            dec(1L);
        }

        public void dec(long j) {
            throw new IllegalStateException("not implemented");
        }

        public MetricName getName() {
            return this.named;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest$MyDistribution.class */
    public static class MyDistribution implements Distribution {
        private final MetricName name;
        private final List<Long> updateInvocations = new ArrayList();

        public MyDistribution(String str, String str2) {
            this.name = MetricName.named(str, str2);
        }

        public void update(long j) {
            BaseFirestoreV1WriteFnTest.LOG.trace("{} {}:update(value = {})", new Object[]{BaseFirestoreV1WriteFnTest.METRIC_MARKER, this.name, Long.valueOf(j)});
            this.updateInvocations.add(Long.valueOf(j));
        }

        public void update(long j, long j2, long j3, long j4) {
            throw new IllegalStateException("not implemented");
        }

        public MetricName getName() {
            return this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest$TestClock.class */
    public static class TestClock implements JodaClock {
        private final Function<Instant, Instant> defaultNext;
        private Function<Instant, Instant> next;
        private Instant prev;

        public TestClock(Instant instant, Duration duration) {
            this.prev = instant;
            this.defaultNext = BaseFirestoreV1WriteFnTest.advanceClockBy(duration);
        }

        public TestClock setNext(Function<Instant, Instant> function) {
            this.next = function;
            return this;
        }

        public Instant instant() {
            Instant apply;
            if (this.next != null) {
                apply = this.next.apply(this.prev);
                this.next = null;
            } else {
                apply = this.defaultNext.apply(this.prev);
            }
            this.prev = apply;
            BaseFirestoreV1WriteFnTest.LOG.trace("{} testClock:instant:{}", BaseFirestoreV1WriteFnTest.METRIC_MARKER, apply.toString());
            return apply;
        }
    }

    @Before
    public final void setUp() {
        Mockito.when(this.rpcQos.newWriteAttempt((RpcQos.RpcAttempt.Context) ArgumentMatchers.any())).thenReturn(this.attempt, new RpcQos.RpcWriteAttempt[]{this.attempt2});
        Mockito.when(this.ff.getRpcQos((RpcQosOptions) ArgumentMatchers.any())).thenReturn(this.rpcQos);
        Mockito.when(this.ff.getFirestoreStub(this.pipelineOptions)).thenReturn(this.stub);
        Mockito.when(this.stub.batchWriteCallable()).thenReturn(this.callable);
        this.metricsFixture = new MetricsFixture();
    }

    @Override // org.apache.beam.sdk.io.gcp.firestore.BaseFirestoreV1FnTest
    @Test
    public final void attemptsExhaustedForRetryableError() throws Exception {
        Instant ofEpochMilli = Instant.ofEpochMilli(0L);
        Instant ofEpochMilli2 = Instant.ofEpochMilli(1L);
        Instant ofEpochMilli3 = Instant.ofEpochMilli(2L);
        Instant ofEpochMilli4 = Instant.ofEpochMilli(3L);
        Instant ofEpochMilli5 = Instant.ofEpochMilli(4L);
        Instant ofEpochMilli6 = Instant.ofEpochMilli(5L);
        Instant ofEpochMilli7 = Instant.ofEpochMilli(6L);
        Write newWrite = FirestoreProtoHelpers.newWrite();
        RpcQos.RpcWriteAttempt.Element writeElement = new FirestoreV1WriteFn.WriteElement(0, newWrite, this.window);
        Mockito.when(this.ff.getFirestoreStub((PipelineOptions) ArgumentMatchers.any())).thenReturn(this.stub);
        Mockito.when(this.ff.getRpcQos((RpcQosOptions) ArgumentMatchers.any())).thenReturn(this.rpcQos);
        Mockito.when(this.rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite)).thenReturn(this.attempt);
        Mockito.when(this.stub.batchWriteCallable()).thenReturn(this.callable);
        RpcQos.RpcWriteAttempt.FlushBuffer flushBuffer = (RpcQos.RpcWriteAttempt.FlushBuffer) PowerMockito.spy(newFlushBuffer(this.rpcQosOptions));
        Mockito.when(Boolean.valueOf(this.attempt.awaitSafeToProceed((Instant) ArgumentMatchers.any()))).thenReturn(true);
        Mockito.when(this.attempt.newFlushBuffer(ofEpochMilli)).thenReturn(flushBuffer);
        Mockito.when(Boolean.valueOf(flushBuffer.offer(writeElement))).thenReturn(true);
        Mockito.when(flushBuffer.iterator()).thenReturn(Lists.newArrayList(new RpcQos.RpcWriteAttempt.Element[]{writeElement}).iterator());
        Mockito.when(Integer.valueOf(flushBuffer.getBufferedElementsCount())).thenReturn(1);
        Mockito.when(Boolean.valueOf(flushBuffer.isFull())).thenReturn(true);
        Mockito.when((BatchWriteResponse) this.callable.call((BatchWriteRequest) ArgumentMatchers.any())).thenThrow(new Throwable[]{RETRYABLE_ERROR, RETRYABLE_ERROR, RETRYABLE_ERROR});
        ((RpcQos.RpcWriteAttempt) Mockito.doNothing().when(this.attempt)).recordWriteCounts((Instant) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt());
        ((RpcQos.RpcWriteAttempt) Mockito.doNothing().doNothing().doThrow(new Throwable[]{RETRYABLE_ERROR}).when(this.attempt)).checkCanRetry((Instant) ArgumentMatchers.any(), (RuntimeException) ArgumentMatchers.eq(RETRYABLE_ERROR));
        Mockito.when((Write) this.processContext.element()).thenReturn(newWrite);
        try {
            runFunction(getFn(this.clock, this.ff, this.rpcQosOptions, CounterFactory.DEFAULT, DistributionFactory.DEFAULT));
            Assert.fail("Expected ApiException to be throw after exhausted attempts");
        } catch (ApiException e) {
            Assert.assertSame(RETRYABLE_ERROR, e);
        }
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).awaitSafeToProceed(ofEpochMilli);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordRequestStart(ofEpochMilli2, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordWriteCounts(ofEpochMilli3, 0, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordRequestStart(ofEpochMilli4, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordWriteCounts(ofEpochMilli5, 0, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordRequestStart(ofEpochMilli6, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordWriteCounts(ofEpochMilli7, 0, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(0))).recordWriteCounts((Instant) ArgumentMatchers.any(), AdditionalMatchers.gt(0), ArgumentMatchers.anyInt());
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.never())).completeSuccess();
    }

    @Override // org.apache.beam.sdk.io.gcp.firestore.BaseFirestoreV1FnTest
    @Test
    public final void noRequestIsSentIfNotSafeToProceed() throws Exception {
        Mockito.when(this.ff.getFirestoreStub((PipelineOptions) ArgumentMatchers.any())).thenReturn(this.stub);
        Mockito.when(this.ff.getRpcQos((RpcQosOptions) ArgumentMatchers.any())).thenReturn(this.rpcQos);
        Mockito.when(this.rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite)).thenReturn(this.attempt);
        InterruptedException interruptedException = new InterruptedException();
        Mockito.when(Boolean.valueOf(this.attempt.awaitSafeToProceed((Instant) ArgumentMatchers.any()))).thenReturn(false).thenThrow(new Throwable[]{interruptedException});
        Mockito.when((Write) this.processContext.element()).thenReturn(FirestoreProtoHelpers.newWrite());
        try {
            runFunction(getFn(this.clock, this.ff, this.rpcQosOptions, CounterFactory.DEFAULT, DistributionFactory.DEFAULT));
            Assert.fail("Expected ApiException to be throw after exhausted attempts");
        } catch (InterruptedException e) {
            Assert.assertSame(interruptedException, e);
        }
        ((FirestoreStub) Mockito.verify(this.stub, Mockito.times(1))).close();
        Mockito.verifyNoMoreInteractions(new Object[]{this.stub});
        Mockito.verifyNoMoreInteractions(new Object[]{this.callable});
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(0))).recordWriteCounts((Instant) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt());
    }

    @Test
    public abstract void enqueueingWritesValidateBytesSize() throws Exception;

    @Test
    public final void endToEnd_success() throws Exception {
        Write newWrite = FirestoreProtoHelpers.newWrite();
        BatchWriteRequest build = BatchWriteRequest.newBuilder().setDatabase("projects/testing-project/databases/(default)").addWrites(newWrite).build();
        BatchWriteResponse build2 = BatchWriteResponse.newBuilder().addStatus(STATUS_OK).build();
        FirestoreV1WriteFn.WriteElement writeElement = new FirestoreV1WriteFn.WriteElement(0, newWrite, this.window);
        Instant ofEpochMilli = Instant.ofEpochMilli(0L);
        Instant ofEpochMilli2 = Instant.ofEpochMilli(1L);
        Instant ofEpochMilli3 = Instant.ofEpochMilli(2L);
        RpcQosOptions build3 = this.rpcQosOptions.toBuilder().withBatchMaxCount(1).build();
        RpcQos.RpcWriteAttempt.FlushBuffer flushBuffer = (RpcQos.RpcWriteAttempt.FlushBuffer) PowerMockito.spy(newFlushBuffer(build3));
        Mockito.when((Write) this.processContext.element()).thenReturn(newWrite);
        Mockito.when(Boolean.valueOf(this.attempt.awaitSafeToProceed((Instant) ArgumentMatchers.any()))).thenReturn(true);
        Mockito.when(this.attempt.newFlushBuffer(ofEpochMilli)).thenReturn(flushBuffer);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(BatchWriteRequest.class);
        Mockito.when((BatchWriteResponse) this.callable.call((BatchWriteRequest) forClass.capture())).thenReturn(build2);
        runFunction(getFn(this.clock, this.ff, build3, CounterFactory.DEFAULT, DistributionFactory.DEFAULT));
        Assert.assertEquals(build, forClass.getValue());
        ((RpcQos.RpcWriteAttempt.FlushBuffer) Mockito.verify(flushBuffer, Mockito.times(1))).offer(writeElement);
        ((RpcQos.RpcWriteAttempt.FlushBuffer) Mockito.verify(flushBuffer, Mockito.times(1))).isFull();
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordRequestStart(ofEpochMilli2, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordWriteCounts(ofEpochMilli3, 1, 0);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.never())).recordWriteCounts((Instant) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), AdditionalMatchers.gt(0));
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.never())).checkCanRetry((Instant) ArgumentMatchers.any(), (RuntimeException) ArgumentMatchers.any());
    }

    @Test
    public final void endToEnd_exhaustingAttemptsResultsInException() throws Exception {
        Throwable createException = ApiExceptionFactory.createException(new IOException("err1"), GrpcStatusCode.of(Status.Code.ABORTED), false);
        Throwable createException2 = ApiExceptionFactory.createException(new IOException("err2"), GrpcStatusCode.of(Status.Code.ABORTED), false);
        Throwable createException3 = ApiExceptionFactory.createException(new IOException("err3"), GrpcStatusCode.of(Status.Code.ABORTED), false);
        Instant ofEpochMilli = Instant.ofEpochMilli(0L);
        Instant ofEpochMilli2 = Instant.ofEpochMilli(1L);
        Instant ofEpochMilli3 = Instant.ofEpochMilli(2L);
        Instant ofEpochMilli4 = Instant.ofEpochMilli(3L);
        Instant ofEpochMilli5 = Instant.ofEpochMilli(4L);
        Instant ofEpochMilli6 = Instant.ofEpochMilli(5L);
        Instant ofEpochMilli7 = Instant.ofEpochMilli(6L);
        Write newWrite = FirestoreProtoHelpers.newWrite();
        RpcQos.RpcWriteAttempt.Element writeElement = new FirestoreV1WriteFn.WriteElement(0, newWrite, this.window);
        RpcQos.RpcWriteAttempt.FlushBuffer flushBuffer = (RpcQos.RpcWriteAttempt.FlushBuffer) PowerMockito.spy(newFlushBuffer(this.rpcQosOptions));
        Mockito.when((Write) this.processContext.element()).thenReturn(newWrite);
        Mockito.when(Boolean.valueOf(this.attempt.awaitSafeToProceed((Instant) ArgumentMatchers.any()))).thenReturn(true);
        Mockito.when(this.attempt.newFlushBuffer(ofEpochMilli)).thenReturn(flushBuffer);
        Mockito.when(Boolean.valueOf(flushBuffer.isFull())).thenReturn(true);
        Mockito.when(Boolean.valueOf(flushBuffer.offer(writeElement))).thenReturn(true);
        Mockito.when(flushBuffer.iterator()).thenReturn(Lists.newArrayList(new RpcQos.RpcWriteAttempt.Element[]{writeElement}).iterator());
        Mockito.when(Integer.valueOf(flushBuffer.getBufferedElementsCount())).thenReturn(1);
        Mockito.when((BatchWriteResponse) this.callable.call((BatchWriteRequest) ArgumentMatchers.any())).thenThrow(new Throwable[]{createException, createException2, createException3});
        ((RpcQos.RpcWriteAttempt) Mockito.doNothing().when(this.attempt)).checkCanRetry((Instant) ArgumentMatchers.any(), (RuntimeException) ArgumentMatchers.eq(createException));
        ((RpcQos.RpcWriteAttempt) Mockito.doNothing().when(this.attempt)).checkCanRetry((Instant) ArgumentMatchers.any(), (RuntimeException) ArgumentMatchers.eq(createException2));
        ((RpcQos.RpcWriteAttempt) Mockito.doThrow(new Throwable[]{createException3}).when(this.attempt)).checkCanRetry((Instant) ArgumentMatchers.any(), (RuntimeException) ArgumentMatchers.eq(createException3));
        try {
            runFunction(getFn(this.clock, this.ff, this.rpcQosOptions, CounterFactory.DEFAULT, DistributionFactory.DEFAULT));
            Assert.fail("Expected exception");
        } catch (ApiException e) {
            Assert.assertNotNull(e.getMessage());
            Assert.assertTrue(e.getMessage().contains("err3"));
        }
        ((RpcQos.RpcWriteAttempt.FlushBuffer) Mockito.verify(flushBuffer, Mockito.times(1))).offer(writeElement);
        ((RpcQos.RpcWriteAttempt.FlushBuffer) Mockito.verify(flushBuffer, Mockito.atLeastOnce())).isFull();
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordRequestStart(ofEpochMilli2, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordWriteCounts(ofEpochMilli3, 0, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordRequestStart(ofEpochMilli4, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordWriteCounts(ofEpochMilli5, 0, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordRequestStart(ofEpochMilli6, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordWriteCounts(ofEpochMilli7, 0, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.never())).recordWriteCounts((Instant) ArgumentMatchers.any(), AdditionalMatchers.gt(0), ArgumentMatchers.anyInt());
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.never())).completeSuccess();
    }

    @Test
    public final void endToEnd_awaitSafeToProceed_falseIsTerminalForAttempt() throws Exception {
        RpcQosOptions build = this.rpcQosOptions.toBuilder().withBatchMaxCount(2).build();
        Instant ofEpochMilli = Instant.ofEpochMilli(3L);
        Instant ofEpochMilli2 = Instant.ofEpochMilli(4L);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(BatchWriteRequest.class);
        Write newWrite = FirestoreProtoHelpers.newWrite();
        BatchWriteRequest build2 = BatchWriteRequest.newBuilder().setDatabase("projects/testing-project/databases/(default)").addWrites(newWrite).build();
        BatchWriteResponse build3 = BatchWriteResponse.newBuilder().addStatus(STATUS_OK).build();
        Mockito.when((Write) this.processContext.element()).thenReturn(newWrite);
        Mockito.when(Boolean.valueOf(this.attempt.awaitSafeToProceed((Instant) ArgumentMatchers.any()))).thenReturn(false).thenThrow(new Throwable[]{new IllegalStateException("too many attempt1#awaitSafeToProceed")});
        Mockito.when(Boolean.valueOf(this.attempt2.awaitSafeToProceed((Instant) ArgumentMatchers.any()))).thenReturn(true).thenThrow(new Throwable[]{new IllegalStateException("too many attempt2#awaitSafeToProceed")});
        Mockito.when(this.attempt2.newFlushBuffer((Instant) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            return newFlushBuffer(build);
        });
        RpcQos.RpcWriteAttempt rpcWriteAttempt = (RpcQos.RpcWriteAttempt) Mockito.mock(RpcQos.RpcWriteAttempt.class);
        Mockito.when(Boolean.valueOf(rpcWriteAttempt.awaitSafeToProceed((Instant) ArgumentMatchers.any()))).thenReturn(true, new Boolean[]{true}).thenThrow(new Throwable[]{new IllegalStateException("too many finishBundleAttempt#awaitSafeToProceed")});
        Mockito.when(rpcWriteAttempt.newFlushBuffer((Instant) ArgumentMatchers.any())).thenAnswer(invocationOnMock2 -> {
            return newFlushBuffer(build);
        });
        Mockito.when(this.rpcQos.newWriteAttempt((RpcQos.RpcAttempt.Context) ArgumentMatchers.any())).thenReturn(this.attempt, new RpcQos.RpcWriteAttempt[]{this.attempt2, rpcWriteAttempt});
        Mockito.when((BatchWriteResponse) this.callable.call((BatchWriteRequest) forClass.capture())).thenReturn(build3);
        runFunction(getFn(this.clock, this.ff, build, CounterFactory.DEFAULT, DistributionFactory.DEFAULT));
        Assert.assertEquals(build2, forClass.getValue());
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).awaitSafeToProceed((Instant) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.attempt});
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt2, Mockito.times(1))).awaitSafeToProceed((Instant) ArgumentMatchers.any());
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt2, Mockito.times(1))).newFlushBuffer((Instant) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.attempt2});
        ((RpcQos.RpcWriteAttempt) Mockito.verify(rpcWriteAttempt, Mockito.times(1))).recordRequestStart(ofEpochMilli, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(rpcWriteAttempt, Mockito.times(1))).recordWriteCounts(ofEpochMilli2, 1, 0);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(rpcWriteAttempt, Mockito.times(1))).completeSuccess();
        ((RpcQos.RpcWriteAttempt) Mockito.verify(rpcWriteAttempt, Mockito.never())).checkCanRetry((Instant) ArgumentMatchers.any(), (RuntimeException) ArgumentMatchers.any());
    }

    @Test
    public final void endToEnd_deadlineExceededOnAnIndividualWriteResultsInThrottling() throws Exception {
        LOG.info("docCount = {}", 10000L);
        RpcQosOptions build = this.rpcQosOptions.toBuilder().withHintMaxNumWorkers(100).withSamplePeriod(Duration.standardMinutes(10L)).withReportDiagnosticMetrics().build();
        LOG.debug("options = {}", build);
        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory = (FirestoreStatefulComponentFactory) Mockito.mock(FirestoreStatefulComponentFactory.class);
        Mockito.when(firestoreStatefulComponentFactory.getFirestoreStub((PipelineOptions) ArgumentMatchers.any())).thenReturn(this.stub);
        Random random = new Random(12345L);
        final TestClock testClock = new TestClock(Instant.EPOCH, Duration.standardSeconds(1L));
        RpcQosImpl rpcQosImpl = new RpcQosImpl(build, random, j -> {
            testClock.setNext(advanceClockBy(Duration.millis(j)));
        }, this.metricsFixture.counterFactory, this.metricsFixture.distributionFactory);
        Mockito.when(firestoreStatefulComponentFactory.getRpcQos(build)).thenReturn((RpcQos) Mockito.mock(RpcQos.class, invocationOnMock -> {
            Method method = invocationOnMock.getMethod();
            LOG.debug("method = {}", method);
            return rpcQosImpl.getClass().getMethod(method.getName(), method.getParameterTypes()).invoke(rpcQosImpl, invocationOnMock.getArguments());
        }));
        final int i = 30;
        AtomicLong atomicLong = new AtomicLong();
        Mockito.when((Write) this.processContext.element()).thenAnswer(invocationOnMock2 -> {
            return FirestoreProtoHelpers.newWrite(atomicLong.getAndIncrement());
        });
        Mockito.when((BatchWriteResponse) this.callable.call((BatchWriteRequest) ArgumentMatchers.any())).thenAnswer(new Answer<BatchWriteResponse>() { // from class: org.apache.beam.sdk.io.gcp.firestore.BaseFirestoreV1WriteFnTest.1
            private final Random rand = new Random(84572908);
            private final Instant threshold = Instant.ofEpochMilli(Duration.standardMinutes(20).getMillis());

            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public BatchWriteResponse m80answer(InvocationOnMock invocationOnMock3) throws Throwable {
                BatchWriteRequest batchWriteRequest = (BatchWriteRequest) invocationOnMock3.getArgument(0, BatchWriteRequest.class);
                BaseFirestoreV1WriteFnTest.LOG.debug("request = {}", batchWriteRequest);
                long j2 = 0;
                BatchWriteResponse.Builder newBuilder = BatchWriteResponse.newBuilder();
                for (Write write : batchWriteRequest.getWritesList()) {
                    newBuilder.addWriteResults(WriteResult.newBuilder().build());
                    if (testClock.prev.isBefore(this.threshold)) {
                        j2 += i;
                        newBuilder.addStatus(BaseFirestoreV1WriteFnTest.STATUS_OK);
                    } else {
                        int nextInt = this.rand.nextInt(1500);
                        BaseFirestoreV1WriteFnTest.LOG.debug("latency = {}", Integer.valueOf(nextInt));
                        if (nextInt > 300) {
                            newBuilder.addStatus(BaseFirestoreV1WriteFnTest.STATUS_DEADLINE_EXCEEDED);
                        } else {
                            newBuilder.addStatus(BaseFirestoreV1WriteFnTest.STATUS_OK);
                        }
                        j2 += nextInt;
                    }
                }
                testClock.setNext(BaseFirestoreV1WriteFnTest.advanceClockBy(Duration.millis(j2)));
                return newBuilder.build();
            }
        });
        LOG.info("### parameters: {defaultDocumentWriteLatency: {}, rpcQosOptions: {}}", 30, build);
        FnT fn = getFn(testClock, firestoreStatefulComponentFactory, build, this.metricsFixture.counterFactory, this.metricsFixture.distributionFactory);
        fn.setup();
        fn.startBundle(this.startBundleContext);
        while (atomicLong.get() < 10000) {
            fn.processElement(this.processContext, this.window);
        }
        fn.finishBundle(this.finishBundleContext);
        LOG.info("writeCounter = {}", Long.valueOf(atomicLong.get()));
        LOG.info("clock.prev = {}", testClock.prev);
        MyDistribution myDistribution = this.metricsFixture.distributions.get("qos_adaptiveThrottler_throttlingMs");
        Assert.assertNotNull(myDistribution);
        Assert.assertFalse(myDistribution.updateInvocations.isEmpty());
    }

    @Test
    public final void endToEnd_maxBatchSizeRespected() throws Exception {
        Instant ofEpochMilli = Instant.ofEpochMilli(0L);
        Instant ofEpochMilli2 = Instant.ofEpochMilli(1L);
        Instant ofEpochMilli3 = Instant.ofEpochMilli(2L);
        Instant ofEpochMilli4 = Instant.ofEpochMilli(3L);
        Instant ofEpochMilli5 = Instant.ofEpochMilli(4L);
        Instant ofEpochMilli6 = Instant.ofEpochMilli(5L);
        Instant ofEpochMilli7 = Instant.ofEpochMilli(6L);
        Instant ofEpochMilli8 = Instant.ofEpochMilli(7L);
        Instant ofEpochMilli9 = Instant.ofEpochMilli(8L);
        Instant ofEpochMilli10 = Instant.ofEpochMilli(9L);
        Instant ofEpochMilli11 = Instant.ofEpochMilli(10L);
        Write newWrite = FirestoreProtoHelpers.newWrite(0L);
        Write newWrite2 = FirestoreProtoHelpers.newWrite(1L);
        Write newWrite3 = FirestoreProtoHelpers.newWrite(2L);
        Write newWrite4 = FirestoreProtoHelpers.newWrite(3L);
        Write newWrite5 = FirestoreProtoHelpers.newWrite(4L);
        Write newWrite6 = FirestoreProtoHelpers.newWrite(5L);
        BatchWriteRequest.Builder database = BatchWriteRequest.newBuilder().setDatabase("projects/testing-project/databases/(default)");
        BatchWriteRequest build = database.build().toBuilder().addWrites(newWrite).addWrites(newWrite2).addWrites(newWrite3).addWrites(newWrite4).addWrites(newWrite5).build();
        BatchWriteRequest build2 = database.build().toBuilder().addWrites(newWrite6).build();
        BatchWriteResponse build3 = BatchWriteResponse.newBuilder().addStatus(STATUS_OK).addStatus(STATUS_OK).addStatus(STATUS_OK).addStatus(STATUS_OK).addStatus(STATUS_OK).build();
        BatchWriteResponse build4 = BatchWriteResponse.newBuilder().addStatus(STATUS_OK).build();
        RpcQosOptions build5 = this.rpcQosOptions.toBuilder().withBatchMaxCount(5).build();
        RpcQos.RpcWriteAttempt.FlushBuffer flushBuffer = (RpcQos.RpcWriteAttempt.FlushBuffer) PowerMockito.spy(newFlushBuffer(build5));
        RpcQos.RpcWriteAttempt.FlushBuffer flushBuffer2 = (RpcQos.RpcWriteAttempt.FlushBuffer) PowerMockito.spy(newFlushBuffer(build5));
        Mockito.when((Write) this.processContext.element()).thenReturn(newWrite, new Write[]{newWrite2, newWrite3, newWrite4, newWrite5, newWrite6});
        Mockito.when(this.rpcQos.newWriteAttempt((RpcQos.RpcAttempt.Context) ArgumentMatchers.any())).thenReturn(this.attempt, new RpcQos.RpcWriteAttempt[]{this.attempt, this.attempt, this.attempt, this.attempt, this.attempt2, this.attempt2, this.attempt2}).thenThrow(new Throwable[]{new IllegalStateException("too many attempts")});
        Mockito.when(Boolean.valueOf(this.attempt.awaitSafeToProceed((Instant) ArgumentMatchers.any()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.attempt2.awaitSafeToProceed((Instant) ArgumentMatchers.any()))).thenReturn(true);
        Mockito.when(this.attempt.newFlushBuffer(ofEpochMilli)).thenReturn(newFlushBuffer(build5));
        Mockito.when(this.attempt.newFlushBuffer(ofEpochMilli2)).thenReturn(newFlushBuffer(build5));
        Mockito.when(this.attempt.newFlushBuffer(ofEpochMilli3)).thenReturn(newFlushBuffer(build5));
        Mockito.when(this.attempt.newFlushBuffer(ofEpochMilli4)).thenReturn(newFlushBuffer(build5));
        Mockito.when(this.attempt.newFlushBuffer(ofEpochMilli5)).thenReturn(flushBuffer);
        Mockito.when((BatchWriteResponse) this.callable.call(build)).thenReturn(build3);
        Mockito.when(this.attempt2.newFlushBuffer(ofEpochMilli8)).thenReturn(newFlushBuffer(build5));
        Mockito.when(this.attempt2.newFlushBuffer(ofEpochMilli9)).thenReturn(flushBuffer2);
        Mockito.when((BatchWriteResponse) this.callable.call(build2)).thenReturn(build4);
        runFunction(getFn(this.clock, this.ff, build5, CounterFactory.DEFAULT, DistributionFactory.DEFAULT), 5 + 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordRequestStart(ofEpochMilli6, 5);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordWriteCounts(ofEpochMilli7, 5, 0);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).completeSuccess();
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt2, Mockito.times(1))).recordRequestStart(ofEpochMilli10, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt2, Mockito.times(1))).recordWriteCounts(ofEpochMilli11, 1, 0);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt2, Mockito.times(1))).completeSuccess();
        ((UnaryCallable) Mockito.verify(this.callable, Mockito.times(1))).call(build);
        ((UnaryCallable) Mockito.verify(this.callable, Mockito.times(1))).call(build2);
        Mockito.verifyNoMoreInteractions(new Object[]{this.callable});
        ((RpcQos.RpcWriteAttempt.FlushBuffer) Mockito.verify(flushBuffer, Mockito.times(5))).offer((RpcQos.RpcWriteAttempt.Element) ArgumentMatchers.any());
        ((RpcQos.RpcWriteAttempt.FlushBuffer) Mockito.verify(flushBuffer2, Mockito.times(1))).offer((RpcQos.RpcWriteAttempt.Element) ArgumentMatchers.any());
    }

    @Test
    public final void endToEnd_partialSuccessReturnsWritesToQueue() throws Exception {
        Write newWrite = FirestoreProtoHelpers.newWrite(0L);
        Write newWrite2 = FirestoreProtoHelpers.newWrite(1L);
        Write newWrite3 = FirestoreProtoHelpers.newWrite(2L);
        Write newWrite4 = FirestoreProtoHelpers.newWrite(3L);
        Write newWrite5 = FirestoreProtoHelpers.newWrite(4L);
        BatchWriteRequest build = BatchWriteRequest.newBuilder().setDatabase("projects/testing-project/databases/(default)").addWrites(newWrite).addWrites(newWrite2).addWrites(newWrite3).addWrites(newWrite4).addWrites(newWrite5).build();
        BatchWriteResponse build2 = BatchWriteResponse.newBuilder().addStatus(STATUS_OK).addStatus(statusForCode(Code.INVALID_ARGUMENT)).addStatus(statusForCode(Code.FAILED_PRECONDITION)).addStatus(statusForCode(Code.UNAUTHENTICATED)).addStatus(STATUS_OK).build();
        BatchWriteRequest build3 = BatchWriteRequest.newBuilder().setDatabase("projects/testing-project/databases/(default)").addWrites(newWrite2).addWrites(newWrite3).addWrites(newWrite4).build();
        BatchWriteResponse build4 = BatchWriteResponse.newBuilder().addStatus(STATUS_OK).addStatus(STATUS_OK).addStatus(STATUS_OK).build();
        RpcQosOptions build5 = this.rpcQosOptions.toBuilder().withMaxAttempts(1).withBatchMaxCount(5).build();
        Mockito.when((Write) this.processContext.element()).thenReturn(newWrite, new Write[]{newWrite2, newWrite3, newWrite4, newWrite5}).thenThrow(new Throwable[]{new IllegalStateException("too many calls")});
        Mockito.when(this.rpcQos.newWriteAttempt((RpcQos.RpcAttempt.Context) ArgumentMatchers.any())).thenReturn(this.attempt);
        Mockito.when(Boolean.valueOf(this.attempt.awaitSafeToProceed((Instant) ArgumentMatchers.any()))).thenReturn(true);
        Mockito.when(this.attempt.newFlushBuffer((Instant) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            return newFlushBuffer(build5);
        });
        Mockito.when(Boolean.valueOf(this.attempt.isCodeRetryable(Code.INVALID_ARGUMENT))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.attempt.isCodeRetryable(Code.FAILED_PRECONDITION))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.attempt.isCodeRetryable(Code.UNAUTHENTICATED))).thenReturn(true);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(BatchWriteRequest.class);
        Mockito.when((BatchWriteResponse) this.callable.call((BatchWriteRequest) forClass.capture())).thenReturn(build2);
        FnT fn = getFn(this.clock, this.ff, build5, CounterFactory.DEFAULT, DistributionFactory.DEFAULT);
        fn.setup();
        fn.startBundle(this.startBundleContext);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        Assert.assertEquals(build, forClass.getValue());
        Assert.assertEquals(Lists.newArrayList(new FirestoreV1WriteFn.WriteElement[]{new FirestoreV1WriteFn.WriteElement(1, newWrite2, this.window), new FirestoreV1WriteFn.WriteElement(2, newWrite3, this.window), new FirestoreV1WriteFn.WriteElement(3, newWrite4, this.window)}), new ArrayList(((FirestoreV1WriteFn.BaseBatchWriteFn) fn).writes));
        Assert.assertEquals(5L, ((FirestoreV1WriteFn.BaseBatchWriteFn) fn).queueNextEntryPriority);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(BatchWriteRequest.class);
        Mockito.when((BatchWriteResponse) this.callable.call((BatchWriteRequest) forClass2.capture())).thenReturn(build4);
        fn.finishBundle(this.finishBundleContext);
        Assert.assertEquals(build3, forClass2.getValue());
        Assert.assertEquals(0L, ((FirestoreV1WriteFn.BaseBatchWriteFn) fn).queueNextEntryPriority);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordRequestStart((Instant) ArgumentMatchers.any(), ArgumentMatchers.eq(5));
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordWriteCounts((Instant) ArgumentMatchers.any(), ArgumentMatchers.eq(2), ArgumentMatchers.eq(3));
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordRequestStart((Instant) ArgumentMatchers.any(), ArgumentMatchers.eq(3));
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).recordWriteCounts((Instant) ArgumentMatchers.any(), ArgumentMatchers.eq(3), ArgumentMatchers.eq(0));
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).completeSuccess();
        ((UnaryCallable) Mockito.verify(this.callable, Mockito.times(2))).call((BatchWriteRequest) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(new Object[]{this.callable});
    }

    @Test
    public final void writesRemainInQueueWhenFlushIsNotReadyAndThenFlushesInFinishBundle() throws Exception {
        RpcQosOptions build = this.rpcQosOptions.toBuilder().withMaxAttempts(1).build();
        Write newWrite = FirestoreProtoHelpers.newWrite();
        BatchWriteRequest build2 = BatchWriteRequest.newBuilder().setDatabase("projects/testing-project/databases/(default)").addWrites(newWrite).build();
        BatchWriteResponse build3 = BatchWriteResponse.newBuilder().addStatus(STATUS_OK).build();
        Mockito.when((Write) this.processContext.element()).thenReturn(newWrite).thenThrow(new Throwable[]{new IllegalStateException("too many element calls")});
        Mockito.when(this.rpcQos.newWriteAttempt((RpcQos.RpcAttempt.Context) ArgumentMatchers.any())).thenReturn(this.attempt, new RpcQos.RpcWriteAttempt[]{this.attempt2}).thenThrow(new Throwable[]{new IllegalStateException("too many attempt calls")});
        Mockito.when(Boolean.valueOf(this.attempt.awaitSafeToProceed((Instant) ArgumentMatchers.any()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.attempt2.awaitSafeToProceed((Instant) ArgumentMatchers.any()))).thenReturn(true);
        Mockito.when(this.attempt.newFlushBuffer((Instant) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            return newFlushBuffer(build);
        });
        Mockito.when(this.attempt2.newFlushBuffer((Instant) ArgumentMatchers.any())).thenAnswer(invocationOnMock2 -> {
            return newFlushBuffer(build);
        });
        FnT fn = getFn(this.clock, this.ff, build, CounterFactory.DEFAULT, DistributionFactory.DEFAULT);
        fn.populateDisplayData(this.displayDataBuilder);
        fn.setup();
        fn.startBundle(this.startBundleContext);
        fn.processElement(this.processContext, this.window);
        Assert.assertEquals(1L, ((FirestoreV1WriteFn.BaseBatchWriteFn) fn).writes.size());
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.never())).recordWriteCounts((Instant) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt());
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.never())).checkCanRetry((Instant) ArgumentMatchers.any(), (RuntimeException) ArgumentMatchers.any());
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.never())).completeSuccess();
        Instant ofEpochMilli = Instant.ofEpochMilli(2L);
        Instant ofEpochMilli2 = Instant.ofEpochMilli(3L);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(BatchWriteRequest.class);
        Mockito.when((BatchWriteResponse) this.callable.call((BatchWriteRequest) forClass.capture())).thenReturn(build3);
        fn.finishBundle(this.finishBundleContext);
        Assert.assertEquals(0L, ((FirestoreV1WriteFn.BaseBatchWriteFn) fn).writes.size());
        Assert.assertEquals(build2, forClass.getValue());
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt2, Mockito.times(1))).recordRequestStart(ofEpochMilli, 1);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt2, Mockito.times(1))).recordWriteCounts(ofEpochMilli2, 1, 0);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt2, Mockito.never())).recordWriteCounts((Instant) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), AdditionalMatchers.gt(0));
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt2, Mockito.never())).checkCanRetry((Instant) ArgumentMatchers.any(), (RuntimeException) ArgumentMatchers.any());
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt2, Mockito.times(1))).completeSuccess();
    }

    @Test
    public final void queuedWritesMaintainPriorityIfNotFlushed() throws Exception {
        RpcQosOptions build = this.rpcQosOptions.toBuilder().withMaxAttempts(1).build();
        Write newWrite = FirestoreProtoHelpers.newWrite(0L);
        Write newWrite2 = FirestoreProtoHelpers.newWrite(1L);
        Write newWrite3 = FirestoreProtoHelpers.newWrite(2L);
        Write newWrite4 = FirestoreProtoHelpers.newWrite(3L);
        Write newWrite5 = FirestoreProtoHelpers.newWrite(4L);
        Instant ofEpochMilli = Instant.ofEpochMilli(4L);
        Mockito.when((Write) this.processContext.element()).thenReturn(newWrite, new Write[]{newWrite2, newWrite3, newWrite4, newWrite5}).thenThrow(new Throwable[]{new IllegalStateException("too many calls")});
        Mockito.when(this.rpcQos.newWriteAttempt((RpcQos.RpcAttempt.Context) ArgumentMatchers.any())).thenReturn(this.attempt);
        Mockito.when(Boolean.valueOf(this.attempt.awaitSafeToProceed((Instant) ArgumentMatchers.any()))).thenReturn(true);
        Mockito.when(this.attempt.newFlushBuffer((Instant) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            return newFlushBuffer(build);
        });
        FnT fn = getFn(this.clock, this.ff, build, CounterFactory.DEFAULT, DistributionFactory.DEFAULT);
        fn.setup();
        fn.startBundle(this.startBundleContext);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        Assert.assertEquals(Lists.newArrayList(new FirestoreV1WriteFn.WriteElement[]{new FirestoreV1WriteFn.WriteElement(0, newWrite, this.window), new FirestoreV1WriteFn.WriteElement(1, newWrite2, this.window), new FirestoreV1WriteFn.WriteElement(2, newWrite3, this.window), new FirestoreV1WriteFn.WriteElement(3, newWrite4, this.window), new FirestoreV1WriteFn.WriteElement(4, newWrite5, this.window)}), new ArrayList(((FirestoreV1WriteFn.BaseBatchWriteFn) fn).writes));
        Assert.assertEquals(5L, ((FirestoreV1WriteFn.BaseBatchWriteFn) fn).queueNextEntryPriority);
        ((RpcQos.RpcWriteAttempt) Mockito.verify(this.attempt, Mockito.times(1))).newFlushBuffer(ofEpochMilli);
        Mockito.verifyNoMoreInteractions(new Object[]{this.callable});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.sdk.io.gcp.firestore.BaseFirestoreFnTest
    /* renamed from: getFn, reason: merged with bridge method [inline-methods] */
    public final FnT mo78getFn() {
        return getFn(JodaClock.DEFAULT, FirestoreStatefulComponentFactory.INSTANCE, this.rpcQosOptions, CounterFactory.DEFAULT, DistributionFactory.DEFAULT);
    }

    protected abstract FnT getFn(JodaClock jodaClock, FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions, CounterFactory counterFactory, DistributionFactory distributionFactory);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.sdk.io.gcp.firestore.BaseFirestoreFnTest
    public final void processElementsAndFinishBundle(FnT fnt, int i) throws Exception {
        for (int i2 = 0; i2 < i; i2++) {
            try {
                fnt.processElement(this.processContext, this.window);
            } finally {
                fnt.finishBundle(this.finishBundleContext);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RpcQosImpl.FlushBufferImpl<Write, RpcQos.RpcWriteAttempt.Element<Write>> newFlushBuffer(RpcQosOptions rpcQosOptions) {
        return new RpcQosImpl.FlushBufferImpl<>(rpcQosOptions.getBatchMaxCount(), rpcQosOptions.getBatchMaxBytes());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static com.google.rpc.Status statusForCode(Code code) {
        return com.google.rpc.Status.newBuilder().setCode(code.getNumber()).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Function<Instant, Instant> advanceClockBy(Duration duration) {
        return instant -> {
            return instant.withDurationAdded(duration, 1);
        };
    }

    @Override // org.apache.beam.sdk.io.gcp.firestore.BaseFirestoreFnTest
    @Before
    public /* bridge */ /* synthetic */ void stubPipelineOptions() {
        super.stubPipelineOptions();
    }

    @Override // org.apache.beam.sdk.io.gcp.firestore.BaseFirestoreFnTest
    @Before
    public /* bridge */ /* synthetic */ void stubDisplayDataBuilderChains() {
        super.stubDisplayDataBuilderChains();
    }
}
