package org.apache.beam.sdk.io.gcp.firestore;

import java.util.Arrays;
import java.util.Random;
import org.apache.beam.sdk.io.gcp.firestore.RpcQos;
import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.util.Sleeper;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/firestore/RpcQosSimulationTest.class */
public final class RpcQosSimulationTest {

    @Mock(lenient = true)
    private Sleeper sleeper;

    @Mock(lenient = true)
    private CounterFactory counterFactory;

    @Mock(lenient = true)
    private DistributionFactory distributionFactory;

    @Mock(lenient = true)
    private Counter counterThrottlingMs;

    @Mock(lenient = true)
    private Counter counterRpcFailures;

    @Mock(lenient = true)
    private Counter counterRpcSuccesses;

    @Mock(lenient = true)
    private Counter counterRpcStreamValueReceived;

    @Rule
    public final TestName testName = new TestName();
    private final Random random = new Random(1234567890);
    private RpcQos.RpcAttempt.Context rpcAttemptContext;

    @Before
    public void setUp() {
        this.rpcAttemptContext = () -> {
            return String.format("%s.%s", getClass().getName(), this.testName.getMethodName());
        };
        Mockito.when(this.counterFactory.get(this.rpcAttemptContext.getNamespace(), "throttlingMs")).thenReturn(this.counterThrottlingMs);
        Mockito.when(this.counterFactory.get(this.rpcAttemptContext.getNamespace(), "rpc_failures")).thenReturn(this.counterRpcFailures);
        Mockito.when(this.counterFactory.get(this.rpcAttemptContext.getNamespace(), "rpc_successes")).thenReturn(this.counterRpcSuccesses);
        Mockito.when(this.counterFactory.get(this.rpcAttemptContext.getNamespace(), "rpc_streamValueReceived")).thenReturn(this.counterRpcStreamValueReceived);
        Mockito.when(this.distributionFactory.get((String) ArgumentMatchers.any(), (String) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            return Mockito.mock(Distribution.class, (String) invocationOnMock.getArgument(1, String.class));
        });
    }

    @Test
    public void writeRampUp_shouldScaleAlongTheExpectedLine() throws InterruptedException {
        RpcQosImpl rpcQosImpl = new RpcQosImpl(RpcQosOptions.newBuilder().withHintMaxNumWorkers(1).withThrottleDuration(Duration.standardSeconds(5L)).withBatchInitialCount(200).withSamplePeriod(Duration.standardMinutes(10L)).withSamplePeriodBucketSize(Duration.standardMinutes(2L)).build(), this.random, this.sleeper, this.counterFactory, this.distributionFactory);
        Instant t = t(Duration.ZERO);
        Instant t2 = t(Duration.millis(10L));
        Instant t3 = t(Duration.millis(20L));
        Instant t4 = t(Duration.millis(999L));
        Instant t5 = t(seconds(1));
        Instant t6 = t(seconds(1), Duration.millis(1L));
        Instant t7 = t(minutes(5));
        Instant t8 = t(minutes(5), Duration.millis(1L));
        Instant t9 = t(minutes(5), Duration.millis(2L));
        Instant t10 = t(minutes(10));
        Instant t11 = t(minutes(10), Duration.millis(1L));
        Instant t12 = t(minutes(10), Duration.millis(2L));
        Instant t13 = t(minutes(15), Duration.millis(1L));
        Instant t14 = t(minutes(20));
        Instant t15 = t(minutes(20), seconds(1));
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t, 200, 200, "write 200");
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t2, 300, 300, "write 300");
        unsafeToProceed(rpcQosImpl, t3);
        unsafeToProceed(rpcQosImpl, t4);
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t5, 500, 500, "wait 1 second for budget to refill, write 500");
        unsafeToProceed(rpcQosImpl, t6);
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t7, 500, 100, "jump ahead to next ramp up interval and write 100");
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t8, 400, 400, "write another 400 exhausting budget");
        unsafeToProceed(rpcQosImpl, t9);
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t10, 500, 500, "after 10 minutes the ramp up should allow 750 writes, write 500");
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t11, 250, 250, "write 250 more");
        unsafeToProceed(rpcQosImpl, t12);
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t13, 500, 500, "after 15 minutes the ramp up should allow 1,125 writes, write 500");
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t13, 500, 500, "write 500 more");
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t13, 125, 125, "write 125 more");
        unsafeToProceed(rpcQosImpl, t13);
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t14, 500, 500, "after 20 minutes the ramp up should allow 1,687 writes, write 500");
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t14, 500, 500, "write 500 more");
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t14, 500, 500, "write 500 more");
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t14, 187, 187, "write 125 more");
        unsafeToProceed(rpcQosImpl, t14);
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t15, 500, 500, "wait 1 second for the budget to refill, write 500");
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t15, 500, 500, "write 500 more");
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t15, 500, 500, "write 500 more");
        safeToProceedAndWithBudgetAndWrite(rpcQosImpl, t15, 187, 187, "write 125 more");
        unsafeToProceed(rpcQosImpl, t15);
    }

    private static Instant t(Duration duration) {
        return Instant.ofEpochMilli(duration.getMillis());
    }

    private static Instant t(Duration... durationArr) {
        return t((Duration) Arrays.stream(durationArr).reduce(Duration.ZERO, (v0, v1) -> {
            return v0.plus(v1);
        }, (v0, v1) -> {
            return v0.plus(v1);
        }));
    }

    private static Duration minutes(int i) {
        return Duration.standardMinutes(i);
    }

    private static Duration seconds(int i) {
        return Duration.standardSeconds(i);
    }

    private void unsafeToProceed(RpcQosImpl rpcQosImpl, Instant instant) throws InterruptedException {
        Assert.assertFalse(msg("verify budget depleted", instant, "awaitSafeToProceed was true, expected false"), rpcQosImpl.newWriteAttempt(this.rpcAttemptContext).awaitSafeToProceed(instant));
    }

    private void safeToProceedAndWithBudgetAndWrite(RpcQosImpl rpcQosImpl, Instant instant, int i, int i2, String str) throws InterruptedException {
        RpcQosImpl.RpcWriteAttemptImpl newWriteAttempt = rpcQosImpl.newWriteAttempt(this.rpcAttemptContext);
        Assert.assertTrue(msg(str, instant, "awaitSafeToProceed was false, expected true"), newWriteAttempt.awaitSafeToProceed(instant));
        Assert.assertEquals(msg(str, instant, "unexpected batchMaxCount"), i, newWriteAttempt.newFlushBuffer(instant).nextBatchMaxCount);
        newWriteAttempt.recordRequestStart(instant, i2);
        newWriteAttempt.recordWriteCounts(instant, i2, 0);
    }

    private static String msg(String str, Instant instant, String str2) {
        return String.format("[%s @ t = %s] %s", str, instant, str2);
    }
}
