/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.firestore;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.firestore.v1.stub.FirestoreStub;
import com.google.firestore.v1.BatchWriteRequest;
import com.google.firestore.v1.BatchWriteResponse;
import com.google.firestore.v1.Write;
import com.google.firestore.v1.WriteResult;
import com.google.rpc.Code;
import com.google.rpc.Status;
import io.grpc.Status;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.beam.sdk.io.gcp.firestore.BaseFirestoreV1FnTest;
import org.apache.beam.sdk.io.gcp.firestore.CounterFactory;
import org.apache.beam.sdk.io.gcp.firestore.DistributionFactory;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreProtoHelpers;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreStatefulComponentFactory;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1RpcAttemptContexts;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn;
import org.apache.beam.sdk.io.gcp.firestore.JodaClock;
import org.apache.beam.sdk.io.gcp.firestore.RpcQos;
import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl;
import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseFirestoreV1WriteFnTest<OutT, FnT extends FirestoreV1WriteFn.BaseBatchWriteFn<OutT>>
extends BaseFirestoreV1FnTest<Write, OutT, FnT> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseFirestoreV1WriteFnTest.class);
    protected static final Status STATUS_OK = Status.newBuilder().setCode(Code.OK.getNumber()).build();
    protected static final Status STATUS_DEADLINE_EXCEEDED = Status.newBuilder().setCode(Code.DEADLINE_EXCEEDED.getNumber()).build();
    @Mock(lenient=true)
    protected BoundedWindow window;
    @Mock
    protected DoFn.FinishBundleContext finishBundleContext;
    @Mock
    protected UnaryCallable<BatchWriteRequest, BatchWriteResponse> callable;
    @Mock
    protected RpcQos.RpcWriteAttempt attempt;
    @Mock
    protected RpcQos.RpcWriteAttempt attempt2;
    protected MetricsFixture metricsFixture;
    private static final String METRIC_MARKER = "XXX";

    @Before
    public final void setUp() {
        Mockito.when((Object)this.rpcQos.newWriteAttempt((RpcQos.RpcAttempt.Context)ArgumentMatchers.any())).thenReturn((Object)this.attempt, (Object[])new RpcQos.RpcWriteAttempt[]{this.attempt2});
        Mockito.when((Object)this.ff.getRpcQos((RpcQosOptions)ArgumentMatchers.any())).thenReturn((Object)this.rpcQos);
        Mockito.when((Object)this.ff.getFirestoreStub(this.pipelineOptions)).thenReturn((Object)this.stub);
        Mockito.when((Object)this.stub.batchWriteCallable()).thenReturn(this.callable);
        this.metricsFixture = new MetricsFixture();
    }

    @Override
    @Test
    public final void attemptsExhaustedForRetryableError() throws Exception {
        Instant attemptStart = Instant.ofEpochMilli((long)0L);
        Instant rpc1Start = Instant.ofEpochMilli((long)1L);
        Instant rpc1End = Instant.ofEpochMilli((long)2L);
        Instant rpc2Start = Instant.ofEpochMilli((long)3L);
        Instant rpc2End = Instant.ofEpochMilli((long)4L);
        Instant rpc3Start = Instant.ofEpochMilli((long)5L);
        Instant rpc3End = Instant.ofEpochMilli((long)6L);
        Write write = FirestoreProtoHelpers.newWrite();
        FirestoreV1WriteFn.WriteElement element1 = new FirestoreV1WriteFn.WriteElement(0, write, this.window);
        Mockito.when((Object)this.ff.getFirestoreStub((PipelineOptions)ArgumentMatchers.any())).thenReturn((Object)this.stub);
        Mockito.when((Object)this.ff.getRpcQos((RpcQosOptions)ArgumentMatchers.any())).thenReturn((Object)this.rpcQos);
        Mockito.when((Object)this.rpcQos.newWriteAttempt((RpcQos.RpcAttempt.Context)FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite)).thenReturn((Object)this.attempt);
        Mockito.when((Object)this.stub.batchWriteCallable()).thenReturn(this.callable);
        RpcQos.RpcWriteAttempt.FlushBuffer flushBuffer = (RpcQos.RpcWriteAttempt.FlushBuffer)PowerMockito.spy(this.newFlushBuffer(this.rpcQosOptions));
        Mockito.when((Object)this.attempt.awaitSafeToProceed((Instant)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)this.attempt.newFlushBuffer(attemptStart)).thenReturn((Object)flushBuffer);
        Mockito.when((Object)flushBuffer.offer((RpcQos.RpcWriteAttempt.Element)element1)).thenReturn((Object)true);
        Mockito.when((Object)flushBuffer.iterator()).thenReturn(Lists.newArrayList((Object[])new RpcQos.RpcWriteAttempt.Element[]{element1}).iterator());
        Mockito.when((Object)flushBuffer.getBufferedElementsCount()).thenReturn((Object)1);
        Mockito.when((Object)flushBuffer.isFull()).thenReturn((Object)true);
        Mockito.when((Object)((BatchWriteResponse)this.callable.call((Object)((BatchWriteRequest)ArgumentMatchers.any())))).thenThrow(new Throwable[]{RETRYABLE_ERROR, RETRYABLE_ERROR, RETRYABLE_ERROR});
        ((RpcQos.RpcWriteAttempt)Mockito.doNothing().when((Object)this.attempt)).recordWriteCounts((Instant)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt());
        ((RpcQos.RpcWriteAttempt)Mockito.doNothing().doNothing().doThrow(new Throwable[]{RETRYABLE_ERROR}).when((Object)this.attempt)).checkCanRetry((Instant)ArgumentMatchers.any(), (RuntimeException)ArgumentMatchers.eq((Object)((Object)RETRYABLE_ERROR)));
        Mockito.when((Object)((Write)this.processContext.element())).thenReturn((Object)write);
        try {
            this.runFunction(this.getFn(this.clock, this.ff, this.rpcQosOptions, CounterFactory.DEFAULT, DistributionFactory.DEFAULT));
            Assert.fail((String)"Expected ApiException to be throw after exhausted attempts");
        }
        catch (ApiException e) {
            Assert.assertSame((Object)((Object)RETRYABLE_ERROR), (Object)((Object)e));
        }
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).awaitSafeToProceed(attemptStart);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordRequestStart(rpc1Start, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordWriteCounts(rpc1End, 0, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordRequestStart(rpc2Start, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordWriteCounts(rpc2End, 0, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordRequestStart(rpc3Start, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordWriteCounts(rpc3End, 0, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)0))).recordWriteCounts((Instant)ArgumentMatchers.any(), AdditionalMatchers.gt((int)0), ArgumentMatchers.anyInt());
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.never())).completeSuccess();
    }

    @Override
    @Test
    public final void noRequestIsSentIfNotSafeToProceed() throws Exception {
        Mockito.when((Object)this.ff.getFirestoreStub((PipelineOptions)ArgumentMatchers.any())).thenReturn((Object)this.stub);
        Mockito.when((Object)this.ff.getRpcQos((RpcQosOptions)ArgumentMatchers.any())).thenReturn((Object)this.rpcQos);
        Mockito.when((Object)this.rpcQos.newWriteAttempt((RpcQos.RpcAttempt.Context)FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite)).thenReturn((Object)this.attempt);
        InterruptedException interruptedException = new InterruptedException();
        Mockito.when((Object)this.attempt.awaitSafeToProceed((Instant)ArgumentMatchers.any())).thenReturn((Object)false).thenThrow(new Throwable[]{interruptedException});
        Mockito.when((Object)((Write)this.processContext.element())).thenReturn((Object)FirestoreProtoHelpers.newWrite());
        try {
            this.runFunction(this.getFn(this.clock, this.ff, this.rpcQosOptions, CounterFactory.DEFAULT, DistributionFactory.DEFAULT));
            Assert.fail((String)"Expected ApiException to be throw after exhausted attempts");
        }
        catch (InterruptedException e) {
            Assert.assertSame((Object)interruptedException, (Object)e);
        }
        ((FirestoreStub)Mockito.verify((Object)this.stub, (VerificationMode)Mockito.times((int)1))).close();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.stub});
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.callable});
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)0))).recordWriteCounts((Instant)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt());
    }

    @Test
    public abstract void enqueueingWritesValidateBytesSize() throws Exception;

    @Test
    public final void endToEnd_success() throws Exception {
        Write write = FirestoreProtoHelpers.newWrite();
        BatchWriteRequest expectedRequest = BatchWriteRequest.newBuilder().setDatabase("projects/testing-project/databases/(default)").addWrites(write).build();
        BatchWriteResponse response = BatchWriteResponse.newBuilder().addStatus(STATUS_OK).build();
        FirestoreV1WriteFn.WriteElement element1 = new FirestoreV1WriteFn.WriteElement(0, write, this.window);
        Instant attemptStart = Instant.ofEpochMilli((long)0L);
        Instant rpcStart = Instant.ofEpochMilli((long)1L);
        Instant rpcEnd = Instant.ofEpochMilli((long)2L);
        RpcQosOptions options = this.rpcQosOptions.toBuilder().withBatchMaxCount(1).build();
        RpcQos.RpcWriteAttempt.FlushBuffer flushBuffer = (RpcQos.RpcWriteAttempt.FlushBuffer)PowerMockito.spy(this.newFlushBuffer(options));
        Mockito.when((Object)((Write)this.processContext.element())).thenReturn((Object)write);
        Mockito.when((Object)this.attempt.awaitSafeToProceed((Instant)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)this.attempt.newFlushBuffer(attemptStart)).thenReturn((Object)flushBuffer);
        ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(BatchWriteRequest.class);
        Mockito.when((Object)((BatchWriteResponse)this.callable.call((Object)((BatchWriteRequest)requestCaptor.capture())))).thenReturn((Object)response);
        this.runFunction(this.getFn(this.clock, this.ff, options, CounterFactory.DEFAULT, DistributionFactory.DEFAULT));
        Assert.assertEquals((Object)expectedRequest, (Object)requestCaptor.getValue());
        ((RpcQos.RpcWriteAttempt.FlushBuffer)Mockito.verify((Object)flushBuffer, (VerificationMode)Mockito.times((int)1))).offer((RpcQos.RpcWriteAttempt.Element)element1);
        ((RpcQos.RpcWriteAttempt.FlushBuffer)Mockito.verify((Object)flushBuffer, (VerificationMode)Mockito.times((int)1))).isFull();
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordRequestStart(rpcStart, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordWriteCounts(rpcEnd, 1, 0);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.never())).recordWriteCounts((Instant)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), AdditionalMatchers.gt((int)0));
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.never())).checkCanRetry((Instant)ArgumentMatchers.any(), (RuntimeException)ArgumentMatchers.any());
    }

    @Test
    public final void endToEnd_exhaustingAttemptsResultsInException() throws Exception {
        ApiException err1 = ApiExceptionFactory.createException((Throwable)new IOException("err1"), (StatusCode)GrpcStatusCode.of((Status.Code)Status.Code.ABORTED), (boolean)false);
        ApiException err2 = ApiExceptionFactory.createException((Throwable)new IOException("err2"), (StatusCode)GrpcStatusCode.of((Status.Code)Status.Code.ABORTED), (boolean)false);
        ApiException err3 = ApiExceptionFactory.createException((Throwable)new IOException("err3"), (StatusCode)GrpcStatusCode.of((Status.Code)Status.Code.ABORTED), (boolean)false);
        Instant attemptStart = Instant.ofEpochMilli((long)0L);
        Instant rpc1Start = Instant.ofEpochMilli((long)1L);
        Instant rpc1End = Instant.ofEpochMilli((long)2L);
        Instant rpc2Start = Instant.ofEpochMilli((long)3L);
        Instant rpc2End = Instant.ofEpochMilli((long)4L);
        Instant rpc3Start = Instant.ofEpochMilli((long)5L);
        Instant rpc3End = Instant.ofEpochMilli((long)6L);
        Write write = FirestoreProtoHelpers.newWrite();
        FirestoreV1WriteFn.WriteElement element1 = new FirestoreV1WriteFn.WriteElement(0, write, this.window);
        RpcQos.RpcWriteAttempt.FlushBuffer flushBuffer = (RpcQos.RpcWriteAttempt.FlushBuffer)PowerMockito.spy(this.newFlushBuffer(this.rpcQosOptions));
        Mockito.when((Object)((Write)this.processContext.element())).thenReturn((Object)write);
        Mockito.when((Object)this.attempt.awaitSafeToProceed((Instant)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)this.attempt.newFlushBuffer(attemptStart)).thenReturn((Object)flushBuffer);
        Mockito.when((Object)flushBuffer.isFull()).thenReturn((Object)true);
        Mockito.when((Object)flushBuffer.offer((RpcQos.RpcWriteAttempt.Element)element1)).thenReturn((Object)true);
        Mockito.when((Object)flushBuffer.iterator()).thenReturn(Lists.newArrayList((Object[])new RpcQos.RpcWriteAttempt.Element[]{element1}).iterator());
        Mockito.when((Object)flushBuffer.getBufferedElementsCount()).thenReturn((Object)1);
        Mockito.when((Object)((BatchWriteResponse)this.callable.call((Object)((BatchWriteRequest)ArgumentMatchers.any())))).thenThrow(new Throwable[]{err1, err2, err3});
        ((RpcQos.RpcWriteAttempt)Mockito.doNothing().when((Object)this.attempt)).checkCanRetry((Instant)ArgumentMatchers.any(), (RuntimeException)ArgumentMatchers.eq((Object)((Object)err1)));
        ((RpcQos.RpcWriteAttempt)Mockito.doNothing().when((Object)this.attempt)).checkCanRetry((Instant)ArgumentMatchers.any(), (RuntimeException)ArgumentMatchers.eq((Object)((Object)err2)));
        ((RpcQos.RpcWriteAttempt)Mockito.doThrow((Throwable[])new Throwable[]{err3}).when((Object)this.attempt)).checkCanRetry((Instant)ArgumentMatchers.any(), (RuntimeException)ArgumentMatchers.eq((Object)((Object)err3)));
        try {
            FnT fn = this.getFn(this.clock, this.ff, this.rpcQosOptions, CounterFactory.DEFAULT, DistributionFactory.DEFAULT);
            this.runFunction(fn);
            Assert.fail((String)"Expected exception");
        }
        catch (ApiException e) {
            Assert.assertNotNull((Object)e.getMessage());
            Assert.assertTrue((boolean)e.getMessage().contains("err3"));
        }
        ((RpcQos.RpcWriteAttempt.FlushBuffer)Mockito.verify((Object)flushBuffer, (VerificationMode)Mockito.times((int)1))).offer((RpcQos.RpcWriteAttempt.Element)element1);
        ((RpcQos.RpcWriteAttempt.FlushBuffer)Mockito.verify((Object)flushBuffer, (VerificationMode)Mockito.atLeastOnce())).isFull();
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordRequestStart(rpc1Start, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordWriteCounts(rpc1End, 0, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordRequestStart(rpc2Start, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordWriteCounts(rpc2End, 0, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordRequestStart(rpc3Start, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordWriteCounts(rpc3End, 0, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.never())).recordWriteCounts((Instant)ArgumentMatchers.any(), AdditionalMatchers.gt((int)0), ArgumentMatchers.anyInt());
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.never())).completeSuccess();
    }

    @Test
    public final void endToEnd_awaitSafeToProceed_falseIsTerminalForAttempt() throws Exception {
        RpcQosOptions options = this.rpcQosOptions.toBuilder().withBatchMaxCount(2).build();
        Instant rpc1Start = Instant.ofEpochMilli((long)3L);
        Instant rpc1End = Instant.ofEpochMilli((long)4L);
        ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(BatchWriteRequest.class);
        Write write = FirestoreProtoHelpers.newWrite();
        BatchWriteRequest expectedRequest = BatchWriteRequest.newBuilder().setDatabase("projects/testing-project/databases/(default)").addWrites(write).build();
        BatchWriteResponse response = BatchWriteResponse.newBuilder().addStatus(STATUS_OK).build();
        Mockito.when((Object)((Write)this.processContext.element())).thenReturn((Object)write);
        Mockito.when((Object)this.attempt.awaitSafeToProceed((Instant)ArgumentMatchers.any())).thenReturn((Object)false).thenThrow(new Throwable[]{new IllegalStateException("too many attempt1#awaitSafeToProceed")});
        Mockito.when((Object)this.attempt2.awaitSafeToProceed((Instant)ArgumentMatchers.any())).thenReturn((Object)true).thenThrow(new Throwable[]{new IllegalStateException("too many attempt2#awaitSafeToProceed")});
        Mockito.when((Object)this.attempt2.newFlushBuffer((Instant)ArgumentMatchers.any())).thenAnswer(invocation -> this.newFlushBuffer(options));
        RpcQos.RpcWriteAttempt finishBundleAttempt = (RpcQos.RpcWriteAttempt)Mockito.mock(RpcQos.RpcWriteAttempt.class);
        Mockito.when((Object)finishBundleAttempt.awaitSafeToProceed((Instant)ArgumentMatchers.any())).thenReturn((Object)true, (Object[])new Boolean[]{true}).thenThrow(new Throwable[]{new IllegalStateException("too many finishBundleAttempt#awaitSafeToProceed")});
        Mockito.when((Object)finishBundleAttempt.newFlushBuffer((Instant)ArgumentMatchers.any())).thenAnswer(invocation -> this.newFlushBuffer(options));
        Mockito.when((Object)this.rpcQos.newWriteAttempt((RpcQos.RpcAttempt.Context)ArgumentMatchers.any())).thenReturn((Object)this.attempt, (Object[])new RpcQos.RpcWriteAttempt[]{this.attempt2, finishBundleAttempt});
        Mockito.when((Object)((BatchWriteResponse)this.callable.call((Object)((BatchWriteRequest)requestCaptor.capture())))).thenReturn((Object)response);
        FnT fn = this.getFn(this.clock, this.ff, options, CounterFactory.DEFAULT, DistributionFactory.DEFAULT);
        this.runFunction(fn);
        Assert.assertEquals((Object)expectedRequest, (Object)requestCaptor.getValue());
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).awaitSafeToProceed((Instant)ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.attempt});
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt2, (VerificationMode)Mockito.times((int)1))).awaitSafeToProceed((Instant)ArgumentMatchers.any());
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt2, (VerificationMode)Mockito.times((int)1))).newFlushBuffer((Instant)ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.attempt2});
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)finishBundleAttempt, (VerificationMode)Mockito.times((int)1))).recordRequestStart(rpc1Start, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)finishBundleAttempt, (VerificationMode)Mockito.times((int)1))).recordWriteCounts(rpc1End, 1, 0);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)finishBundleAttempt, (VerificationMode)Mockito.times((int)1))).completeSuccess();
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)finishBundleAttempt, (VerificationMode)Mockito.never())).checkCanRetry((Instant)ArgumentMatchers.any(), (RuntimeException)ArgumentMatchers.any());
    }

    @Test
    public final void endToEnd_deadlineExceededOnAnIndividualWriteResultsInThrottling() throws Exception {
        long totalDocCount = 1000000L;
        int numWorkers = 100;
        long docCount = 10000L;
        LOG.info("docCount = {}", (Object)10000L);
        RpcQosOptions options = this.rpcQosOptions.toBuilder().withHintMaxNumWorkers(100).withSamplePeriod(Duration.standardMinutes((long)10L)).withReportDiagnosticMetrics().build();
        LOG.debug("options = {}", (Object)options);
        FirestoreStatefulComponentFactory ff = (FirestoreStatefulComponentFactory)Mockito.mock(FirestoreStatefulComponentFactory.class);
        Mockito.when((Object)ff.getFirestoreStub((PipelineOptions)ArgumentMatchers.any())).thenReturn((Object)this.stub);
        Random random = new Random(12345L);
        final TestClock clock = new TestClock(Instant.EPOCH, Duration.standardSeconds((long)1L));
        Sleeper sleeper = millis -> clock.setNext(BaseFirestoreV1WriteFnTest.advanceClockBy(Duration.millis((long)millis)));
        RpcQosImpl qos = new RpcQosImpl(options, random, sleeper, this.metricsFixture.counterFactory, this.metricsFixture.distributionFactory);
        RpcQos qosSpy = (RpcQos)Mockito.mock(RpcQos.class, invocation -> {
            Method method = invocation.getMethod();
            LOG.debug("method = {}", (Object)method);
            Method actualMethod = qos.getClass().getMethod(method.getName(), method.getParameterTypes());
            return actualMethod.invoke((Object)qos, invocation.getArguments());
        });
        Mockito.when((Object)ff.getRpcQos(options)).thenReturn((Object)qosSpy);
        final int defaultDocumentWriteLatency = 30;
        AtomicLong writeCounter = new AtomicLong();
        Mockito.when((Object)((Write)this.processContext.element())).thenAnswer(invocation -> FirestoreProtoHelpers.newWrite(writeCounter.getAndIncrement()));
        Mockito.when((Object)((BatchWriteResponse)this.callable.call((Object)((BatchWriteRequest)ArgumentMatchers.any())))).thenAnswer((Answer)new Answer<BatchWriteResponse>(){
            private final Random rand = new Random(84572908L);
            private final Instant threshold = Instant.ofEpochMilli((long)Duration.standardMinutes((long)20L).getMillis());

            public BatchWriteResponse answer(InvocationOnMock invocation) throws Throwable {
                BatchWriteRequest request = (BatchWriteRequest)invocation.getArgument(0, BatchWriteRequest.class);
                LOG.debug("request = {}", (Object)request);
                long requestDurationMs = 0L;
                BatchWriteResponse.Builder builder = BatchWriteResponse.newBuilder();
                for (Write ignored : request.getWritesList()) {
                    builder.addWriteResults(WriteResult.newBuilder().build());
                    if (clock.prev.isBefore((ReadableInstant)this.threshold)) {
                        requestDurationMs += (long)defaultDocumentWriteLatency;
                        builder.addStatus(STATUS_OK);
                        continue;
                    }
                    int latency = this.rand.nextInt(1500);
                    LOG.debug("latency = {}", (Object)latency);
                    if (latency > 300) {
                        builder.addStatus(STATUS_DEADLINE_EXCEEDED);
                    } else {
                        builder.addStatus(STATUS_OK);
                    }
                    requestDurationMs += (long)latency;
                }
                clock.setNext(BaseFirestoreV1WriteFnTest.advanceClockBy(Duration.millis((long)requestDurationMs)));
                return builder.build();
            }
        });
        LOG.info("### parameters: {defaultDocumentWriteLatency: {}, rpcQosOptions: {}}", (Object)defaultDocumentWriteLatency, (Object)options);
        FnT fn = this.getFn(clock, ff, options, this.metricsFixture.counterFactory, this.metricsFixture.distributionFactory);
        fn.setup();
        fn.startBundle(this.startBundleContext);
        while (writeCounter.get() < 10000L) {
            fn.processElement(this.processContext, this.window);
        }
        fn.finishBundle(this.finishBundleContext);
        LOG.info("writeCounter = {}", (Object)writeCounter.get());
        LOG.info("clock.prev = {}", (Object)clock.prev);
        MyDistribution qosAdaptiveThrottlerThrottlingMs = this.metricsFixture.distributions.get("qos_adaptiveThrottler_throttlingMs");
        Assert.assertNotNull((Object)qosAdaptiveThrottlerThrottlingMs);
        List updateInvocations = qosAdaptiveThrottlerThrottlingMs.updateInvocations;
        Assert.assertFalse((boolean)updateInvocations.isEmpty());
    }

    @Test
    public final void endToEnd_maxBatchSizeRespected() throws Exception {
        Instant enqueue0 = Instant.ofEpochMilli((long)0L);
        Instant enqueue1 = Instant.ofEpochMilli((long)1L);
        Instant enqueue2 = Instant.ofEpochMilli((long)2L);
        Instant enqueue3 = Instant.ofEpochMilli((long)3L);
        Instant enqueue4 = Instant.ofEpochMilli((long)4L);
        Instant group1Rpc1Start = Instant.ofEpochMilli((long)5L);
        Instant group1Rpc1End = Instant.ofEpochMilli((long)6L);
        Instant enqueue5 = Instant.ofEpochMilli((long)7L);
        Instant finalFlush = Instant.ofEpochMilli((long)8L);
        Instant group2Rpc1Start = Instant.ofEpochMilli((long)9L);
        Instant group2Rpc1End = Instant.ofEpochMilli((long)10L);
        Write write0 = FirestoreProtoHelpers.newWrite(0L);
        Write write1 = FirestoreProtoHelpers.newWrite(1L);
        Write write2 = FirestoreProtoHelpers.newWrite(2L);
        Write write3 = FirestoreProtoHelpers.newWrite(3L);
        Write write4 = FirestoreProtoHelpers.newWrite(4L);
        Write write5 = FirestoreProtoHelpers.newWrite(5L);
        int maxValuesPerGroup = 5;
        BatchWriteRequest.Builder builder = BatchWriteRequest.newBuilder().setDatabase("projects/testing-project/databases/(default)");
        BatchWriteRequest expectedGroup1Request = builder.build().toBuilder().addWrites(write0).addWrites(write1).addWrites(write2).addWrites(write3).addWrites(write4).build();
        BatchWriteRequest expectedGroup2Request = builder.build().toBuilder().addWrites(write5).build();
        BatchWriteResponse group1Response = BatchWriteResponse.newBuilder().addStatus(STATUS_OK).addStatus(STATUS_OK).addStatus(STATUS_OK).addStatus(STATUS_OK).addStatus(STATUS_OK).build();
        BatchWriteResponse group2Response = BatchWriteResponse.newBuilder().addStatus(STATUS_OK).build();
        RpcQosOptions options = this.rpcQosOptions.toBuilder().withBatchMaxCount(maxValuesPerGroup).build();
        RpcQos.RpcWriteAttempt.FlushBuffer flushBuffer = (RpcQos.RpcWriteAttempt.FlushBuffer)PowerMockito.spy(this.newFlushBuffer(options));
        RpcQos.RpcWriteAttempt.FlushBuffer flushBuffer2 = (RpcQos.RpcWriteAttempt.FlushBuffer)PowerMockito.spy(this.newFlushBuffer(options));
        Mockito.when((Object)((Write)this.processContext.element())).thenReturn((Object)write0, (Object[])new Write[]{write1, write2, write3, write4, write5});
        Mockito.when((Object)this.rpcQos.newWriteAttempt((RpcQos.RpcAttempt.Context)ArgumentMatchers.any())).thenReturn((Object)this.attempt, (Object[])new RpcQos.RpcWriteAttempt[]{this.attempt, this.attempt, this.attempt, this.attempt, this.attempt2, this.attempt2, this.attempt2}).thenThrow(new Throwable[]{new IllegalStateException("too many attempts")});
        Mockito.when((Object)this.attempt.awaitSafeToProceed((Instant)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)this.attempt2.awaitSafeToProceed((Instant)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)this.attempt.newFlushBuffer(enqueue0)).thenReturn(this.newFlushBuffer(options));
        Mockito.when((Object)this.attempt.newFlushBuffer(enqueue1)).thenReturn(this.newFlushBuffer(options));
        Mockito.when((Object)this.attempt.newFlushBuffer(enqueue2)).thenReturn(this.newFlushBuffer(options));
        Mockito.when((Object)this.attempt.newFlushBuffer(enqueue3)).thenReturn(this.newFlushBuffer(options));
        Mockito.when((Object)this.attempt.newFlushBuffer(enqueue4)).thenReturn((Object)flushBuffer);
        Mockito.when((Object)((BatchWriteResponse)this.callable.call((Object)expectedGroup1Request))).thenReturn((Object)group1Response);
        Mockito.when((Object)this.attempt2.newFlushBuffer(enqueue5)).thenReturn(this.newFlushBuffer(options));
        Mockito.when((Object)this.attempt2.newFlushBuffer(finalFlush)).thenReturn((Object)flushBuffer2);
        Mockito.when((Object)((BatchWriteResponse)this.callable.call((Object)expectedGroup2Request))).thenReturn((Object)group2Response);
        this.runFunction(this.getFn(this.clock, this.ff, options, CounterFactory.DEFAULT, DistributionFactory.DEFAULT), maxValuesPerGroup + 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordRequestStart(group1Rpc1Start, 5);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordWriteCounts(group1Rpc1End, 5, 0);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).completeSuccess();
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt2, (VerificationMode)Mockito.times((int)1))).recordRequestStart(group2Rpc1Start, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt2, (VerificationMode)Mockito.times((int)1))).recordWriteCounts(group2Rpc1End, 1, 0);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt2, (VerificationMode)Mockito.times((int)1))).completeSuccess();
        ((UnaryCallable)Mockito.verify(this.callable, (VerificationMode)Mockito.times((int)1))).call((Object)expectedGroup1Request);
        ((UnaryCallable)Mockito.verify(this.callable, (VerificationMode)Mockito.times((int)1))).call((Object)expectedGroup2Request);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.callable});
        ((RpcQos.RpcWriteAttempt.FlushBuffer)Mockito.verify((Object)flushBuffer, (VerificationMode)Mockito.times((int)maxValuesPerGroup))).offer((RpcQos.RpcWriteAttempt.Element)ArgumentMatchers.any());
        ((RpcQos.RpcWriteAttempt.FlushBuffer)Mockito.verify((Object)flushBuffer2, (VerificationMode)Mockito.times((int)1))).offer((RpcQos.RpcWriteAttempt.Element)ArgumentMatchers.any());
    }

    @Test
    public final void endToEnd_partialSuccessReturnsWritesToQueue() throws Exception {
        Write write0 = FirestoreProtoHelpers.newWrite(0L);
        Write write1 = FirestoreProtoHelpers.newWrite(1L);
        Write write2 = FirestoreProtoHelpers.newWrite(2L);
        Write write3 = FirestoreProtoHelpers.newWrite(3L);
        Write write4 = FirestoreProtoHelpers.newWrite(4L);
        BatchWriteRequest expectedRequest1 = BatchWriteRequest.newBuilder().setDatabase("projects/testing-project/databases/(default)").addWrites(write0).addWrites(write1).addWrites(write2).addWrites(write3).addWrites(write4).build();
        BatchWriteResponse response1 = BatchWriteResponse.newBuilder().addStatus(STATUS_OK).addStatus(BaseFirestoreV1WriteFnTest.statusForCode(Code.INVALID_ARGUMENT)).addStatus(BaseFirestoreV1WriteFnTest.statusForCode(Code.FAILED_PRECONDITION)).addStatus(BaseFirestoreV1WriteFnTest.statusForCode(Code.UNAUTHENTICATED)).addStatus(STATUS_OK).build();
        BatchWriteRequest expectedRequest2 = BatchWriteRequest.newBuilder().setDatabase("projects/testing-project/databases/(default)").addWrites(write1).addWrites(write2).addWrites(write3).build();
        BatchWriteResponse response2 = BatchWriteResponse.newBuilder().addStatus(STATUS_OK).addStatus(STATUS_OK).addStatus(STATUS_OK).build();
        RpcQosOptions options = this.rpcQosOptions.toBuilder().withMaxAttempts(1).withBatchMaxCount(5).build();
        Mockito.when((Object)((Write)this.processContext.element())).thenReturn((Object)write0, (Object[])new Write[]{write1, write2, write3, write4}).thenThrow(new Throwable[]{new IllegalStateException("too many calls")});
        Mockito.when((Object)this.rpcQos.newWriteAttempt((RpcQos.RpcAttempt.Context)ArgumentMatchers.any())).thenReturn((Object)this.attempt);
        Mockito.when((Object)this.attempt.awaitSafeToProceed((Instant)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)this.attempt.newFlushBuffer((Instant)ArgumentMatchers.any())).thenAnswer(invocation -> this.newFlushBuffer(options));
        Mockito.when((Object)this.attempt.isCodeRetryable(Code.INVALID_ARGUMENT)).thenReturn((Object)true);
        Mockito.when((Object)this.attempt.isCodeRetryable(Code.FAILED_PRECONDITION)).thenReturn((Object)true);
        Mockito.when((Object)this.attempt.isCodeRetryable(Code.UNAUTHENTICATED)).thenReturn((Object)true);
        ArgumentCaptor requestCaptor1 = ArgumentCaptor.forClass(BatchWriteRequest.class);
        Mockito.when((Object)((BatchWriteResponse)this.callable.call((Object)((BatchWriteRequest)requestCaptor1.capture())))).thenReturn((Object)response1);
        FnT fn = this.getFn(this.clock, this.ff, options, CounterFactory.DEFAULT, DistributionFactory.DEFAULT);
        fn.setup();
        fn.startBundle(this.startBundleContext);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        Assert.assertEquals((Object)expectedRequest1, (Object)requestCaptor1.getValue());
        ArrayList expectedRemainingWrites = Lists.newArrayList((Object[])new FirestoreV1WriteFn.WriteElement[]{new FirestoreV1WriteFn.WriteElement(1, write1, this.window), new FirestoreV1WriteFn.WriteElement(2, write2, this.window), new FirestoreV1WriteFn.WriteElement(3, write3, this.window)});
        ArrayList actualWrites = new ArrayList(((FirestoreV1WriteFn.BaseBatchWriteFn)fn).writes);
        Assert.assertEquals((Object)expectedRemainingWrites, actualWrites);
        Assert.assertEquals((long)5L, (long)((FirestoreV1WriteFn.BaseBatchWriteFn)fn).queueNextEntryPriority);
        ArgumentCaptor requestCaptor2 = ArgumentCaptor.forClass(BatchWriteRequest.class);
        Mockito.when((Object)((BatchWriteResponse)this.callable.call((Object)((BatchWriteRequest)requestCaptor2.capture())))).thenReturn((Object)response2);
        fn.finishBundle(this.finishBundleContext);
        Assert.assertEquals((Object)expectedRequest2, (Object)requestCaptor2.getValue());
        Assert.assertEquals((long)0L, (long)((FirestoreV1WriteFn.BaseBatchWriteFn)fn).queueNextEntryPriority);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordRequestStart((Instant)ArgumentMatchers.any(), ArgumentMatchers.eq((int)5));
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordWriteCounts((Instant)ArgumentMatchers.any(), ArgumentMatchers.eq((int)2), ArgumentMatchers.eq((int)3));
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordRequestStart((Instant)ArgumentMatchers.any(), ArgumentMatchers.eq((int)3));
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).recordWriteCounts((Instant)ArgumentMatchers.any(), ArgumentMatchers.eq((int)3), ArgumentMatchers.eq((int)0));
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).completeSuccess();
        ((UnaryCallable)Mockito.verify(this.callable, (VerificationMode)Mockito.times((int)2))).call((Object)((BatchWriteRequest)ArgumentMatchers.any()));
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.callable});
    }

    @Test
    public final void writesRemainInQueueWhenFlushIsNotReadyAndThenFlushesInFinishBundle() throws Exception {
        RpcQosOptions options = this.rpcQosOptions.toBuilder().withMaxAttempts(1).build();
        Write write = FirestoreProtoHelpers.newWrite();
        BatchWriteRequest expectedRequest = BatchWriteRequest.newBuilder().setDatabase("projects/testing-project/databases/(default)").addWrites(write).build();
        BatchWriteResponse response = BatchWriteResponse.newBuilder().addStatus(STATUS_OK).build();
        Mockito.when((Object)((Write)this.processContext.element())).thenReturn((Object)write).thenThrow(new Throwable[]{new IllegalStateException("too many element calls")});
        Mockito.when((Object)this.rpcQos.newWriteAttempt((RpcQos.RpcAttempt.Context)ArgumentMatchers.any())).thenReturn((Object)this.attempt, (Object[])new RpcQos.RpcWriteAttempt[]{this.attempt2}).thenThrow(new Throwable[]{new IllegalStateException("too many attempt calls")});
        Mockito.when((Object)this.attempt.awaitSafeToProceed((Instant)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)this.attempt2.awaitSafeToProceed((Instant)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)this.attempt.newFlushBuffer((Instant)ArgumentMatchers.any())).thenAnswer(invocation -> this.newFlushBuffer(options));
        Mockito.when((Object)this.attempt2.newFlushBuffer((Instant)ArgumentMatchers.any())).thenAnswer(invocation -> this.newFlushBuffer(options));
        FnT fn = this.getFn(this.clock, this.ff, options, CounterFactory.DEFAULT, DistributionFactory.DEFAULT);
        fn.populateDisplayData(this.displayDataBuilder);
        fn.setup();
        fn.startBundle(this.startBundleContext);
        fn.processElement(this.processContext, this.window);
        Assert.assertEquals((long)1L, (long)((FirestoreV1WriteFn.BaseBatchWriteFn)fn).writes.size());
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.never())).recordWriteCounts((Instant)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt());
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.never())).checkCanRetry((Instant)ArgumentMatchers.any(), (RuntimeException)ArgumentMatchers.any());
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.never())).completeSuccess();
        Instant attempt2RpcStart = Instant.ofEpochMilli((long)2L);
        Instant attempt2RpcEnd = Instant.ofEpochMilli((long)3L);
        ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(BatchWriteRequest.class);
        Mockito.when((Object)((BatchWriteResponse)this.callable.call((Object)((BatchWriteRequest)requestCaptor.capture())))).thenReturn((Object)response);
        fn.finishBundle(this.finishBundleContext);
        Assert.assertEquals((long)0L, (long)((FirestoreV1WriteFn.BaseBatchWriteFn)fn).writes.size());
        Assert.assertEquals((Object)expectedRequest, (Object)requestCaptor.getValue());
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt2, (VerificationMode)Mockito.times((int)1))).recordRequestStart(attempt2RpcStart, 1);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt2, (VerificationMode)Mockito.times((int)1))).recordWriteCounts(attempt2RpcEnd, 1, 0);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt2, (VerificationMode)Mockito.never())).recordWriteCounts((Instant)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), AdditionalMatchers.gt((int)0));
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt2, (VerificationMode)Mockito.never())).checkCanRetry((Instant)ArgumentMatchers.any(), (RuntimeException)ArgumentMatchers.any());
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt2, (VerificationMode)Mockito.times((int)1))).completeSuccess();
    }

    @Test
    public final void queuedWritesMaintainPriorityIfNotFlushed() throws Exception {
        RpcQosOptions options = this.rpcQosOptions.toBuilder().withMaxAttempts(1).build();
        Write write0 = FirestoreProtoHelpers.newWrite(0L);
        Write write1 = FirestoreProtoHelpers.newWrite(1L);
        Write write2 = FirestoreProtoHelpers.newWrite(2L);
        Write write3 = FirestoreProtoHelpers.newWrite(3L);
        Write write4 = FirestoreProtoHelpers.newWrite(4L);
        Instant write4Start = Instant.ofEpochMilli((long)4L);
        Mockito.when((Object)((Write)this.processContext.element())).thenReturn((Object)write0, (Object[])new Write[]{write1, write2, write3, write4}).thenThrow(new Throwable[]{new IllegalStateException("too many calls")});
        Mockito.when((Object)this.rpcQos.newWriteAttempt((RpcQos.RpcAttempt.Context)ArgumentMatchers.any())).thenReturn((Object)this.attempt);
        Mockito.when((Object)this.attempt.awaitSafeToProceed((Instant)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)this.attempt.newFlushBuffer((Instant)ArgumentMatchers.any())).thenAnswer(invocation -> this.newFlushBuffer(options));
        FnT fn = this.getFn(this.clock, this.ff, options, CounterFactory.DEFAULT, DistributionFactory.DEFAULT);
        fn.setup();
        fn.startBundle(this.startBundleContext);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        fn.processElement(this.processContext, this.window);
        ArrayList expectedWrites = Lists.newArrayList((Object[])new FirestoreV1WriteFn.WriteElement[]{new FirestoreV1WriteFn.WriteElement(0, write0, this.window), new FirestoreV1WriteFn.WriteElement(1, write1, this.window), new FirestoreV1WriteFn.WriteElement(2, write2, this.window), new FirestoreV1WriteFn.WriteElement(3, write3, this.window), new FirestoreV1WriteFn.WriteElement(4, write4, this.window)});
        ArrayList actualWrites = new ArrayList(((FirestoreV1WriteFn.BaseBatchWriteFn)fn).writes);
        Assert.assertEquals((Object)expectedWrites, actualWrites);
        Assert.assertEquals((long)5L, (long)((FirestoreV1WriteFn.BaseBatchWriteFn)fn).queueNextEntryPriority);
        ((RpcQos.RpcWriteAttempt)Mockito.verify((Object)this.attempt, (VerificationMode)Mockito.times((int)1))).newFlushBuffer(write4Start);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.callable});
    }

    @Override
    protected final FnT getFn() {
        return this.getFn(JodaClock.DEFAULT, FirestoreStatefulComponentFactory.INSTANCE, this.rpcQosOptions, CounterFactory.DEFAULT, DistributionFactory.DEFAULT);
    }

    protected abstract FnT getFn(JodaClock var1, FirestoreStatefulComponentFactory var2, RpcQosOptions var3, CounterFactory var4, DistributionFactory var5);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected final void processElementsAndFinishBundle(FnT fn, int processElementCount) throws Exception {
        try {
            for (int i = 0; i < processElementCount; ++i) {
                fn.processElement(this.processContext, this.window);
            }
        }
        finally {
            fn.finishBundle(this.finishBundleContext);
        }
    }

    protected RpcQosImpl.FlushBufferImpl<Write, RpcQos.RpcWriteAttempt.Element<Write>> newFlushBuffer(RpcQosOptions options) {
        return new RpcQosImpl.FlushBufferImpl(options.getBatchMaxCount(), options.getBatchMaxBytes());
    }

    protected static Status statusForCode(Code code) {
        return Status.newBuilder().setCode(code.getNumber()).build();
    }

    private static Function<Instant, Instant> advanceClockBy(Duration duration) {
        return i -> i.withDurationAdded((ReadableDuration)duration, 1);
    }

    private static class MyDistribution
    implements Distribution {
        private final MetricName name;
        private final List<Long> updateInvocations;

        public MyDistribution(String namespace, String name) {
            this.name = MetricName.named((String)namespace, (String)name);
            this.updateInvocations = new ArrayList<Long>();
        }

        public void update(long value) {
            LOG.trace("{} {}:update(value = {})", new Object[]{BaseFirestoreV1WriteFnTest.METRIC_MARKER, this.name, value});
            this.updateInvocations.add(value);
        }

        public void update(long sum, long count, long min, long max) {
            throw new IllegalStateException("not implemented");
        }

        public MetricName getName() {
            return this.name;
        }
    }

    private static class MyCounter
    implements Counter {
        private final MetricName named;
        private final List<Long> incInvocations;

        public MyCounter(String namespace, String name) {
            this.named = MetricName.named((String)namespace, (String)name);
            this.incInvocations = new ArrayList<Long>();
        }

        public void inc() {
            LOG.trace("{} {}:inc()", (Object)BaseFirestoreV1WriteFnTest.METRIC_MARKER, (Object)this.named);
        }

        public void inc(long n) {
            LOG.trace("{} {}:inc(n = {})", new Object[]{BaseFirestoreV1WriteFnTest.METRIC_MARKER, this.named, n});
            this.incInvocations.add(n);
        }

        public void dec() {
            this.dec(1L);
        }

        public void dec(long n) {
            throw new IllegalStateException("not implemented");
        }

        public MetricName getName() {
            return this.named;
        }
    }

    public static final class MetricsFixture {
        final Map<String, MyCounter> counters = new HashMap<String, MyCounter>();
        final Map<String, MyDistribution> distributions = new HashMap<String, MyDistribution>();
        final CounterFactory counterFactory = (CounterFactory & Serializable)(namespace, name) -> this.counters.computeIfAbsent(name, k -> new MyCounter(namespace, name));
        final DistributionFactory distributionFactory = (DistributionFactory & Serializable)(namespace, name) -> this.distributions.computeIfAbsent(name, k -> new MyDistribution(namespace, name));

        public Map<String, MyCounter> getCounters() {
            return ImmutableMap.copyOf(this.counters);
        }

        public Map<String, MyDistribution> getDistributions() {
            return ImmutableMap.copyOf(this.distributions);
        }
    }

    private static class TestClock
    implements JodaClock {
        private final Function<Instant, Instant> defaultNext;
        private Function<Instant, Instant> next;
        private Instant prev;

        public TestClock(Instant start, Duration defaultInterval) {
            this.prev = start;
            this.defaultNext = BaseFirestoreV1WriteFnTest.advanceClockBy(defaultInterval);
        }

        public TestClock setNext(Function<Instant, Instant> next) {
            this.next = next;
            return this;
        }

        public Instant instant() {
            Instant ret;
            if (this.next != null) {
                ret = this.next.apply(this.prev);
                this.next = null;
            } else {
                ret = this.defaultNext.apply(this.prev);
            }
            this.prev = ret;
            LOG.trace("{} testClock:instant:{}", (Object)BaseFirestoreV1WriteFnTest.METRIC_MARKER, (Object)ret.toString());
            return ret;
        }
    }
}

