/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.firestore;

import java.util.Arrays;
import java.util.Random;
import org.apache.beam.sdk.io.gcp.firestore.CounterFactory;
import org.apache.beam.sdk.io.gcp.firestore.DistributionFactory;
import org.apache.beam.sdk.io.gcp.firestore.RpcQos;
import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl;
import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.util.Sleeper;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(value=MockitoJUnitRunner.class)
public final class RpcQosSimulationTest {
    @Mock(lenient=true)
    private Sleeper sleeper;
    @Mock(lenient=true)
    private CounterFactory counterFactory;
    @Mock(lenient=true)
    private DistributionFactory distributionFactory;
    @Mock(lenient=true)
    private Counter counterThrottlingMs;
    @Mock(lenient=true)
    private Counter counterRpcFailures;
    @Mock(lenient=true)
    private Counter counterRpcSuccesses;
    @Mock(lenient=true)
    private Counter counterRpcStreamValueReceived;
    @Rule
    public final TestName testName = new TestName();
    private final Random random = new Random(1234567890L);
    private RpcQos.RpcAttempt.Context rpcAttemptContext;

    @Before
    public void setUp() {
        this.rpcAttemptContext = () -> String.format("%s.%s", this.getClass().getName(), this.testName.getMethodName());
        Mockito.when((Object)this.counterFactory.get(this.rpcAttemptContext.getNamespace(), "throttlingMs")).thenReturn((Object)this.counterThrottlingMs);
        Mockito.when((Object)this.counterFactory.get(this.rpcAttemptContext.getNamespace(), "rpc_failures")).thenReturn((Object)this.counterRpcFailures);
        Mockito.when((Object)this.counterFactory.get(this.rpcAttemptContext.getNamespace(), "rpc_successes")).thenReturn((Object)this.counterRpcSuccesses);
        Mockito.when((Object)this.counterFactory.get(this.rpcAttemptContext.getNamespace(), "rpc_streamValueReceived")).thenReturn((Object)this.counterRpcStreamValueReceived);
        Mockito.when((Object)this.distributionFactory.get((String)ArgumentMatchers.any(), (String)ArgumentMatchers.any())).thenAnswer(invocation -> (Distribution)Mockito.mock(Distribution.class, (String)((String)invocation.getArgument(1, String.class))));
    }

    @Test
    public void writeRampUp_shouldScaleAlongTheExpectedLine() throws InterruptedException {
        RpcQosOptions options = RpcQosOptions.newBuilder().withHintMaxNumWorkers(1).withThrottleDuration(Duration.standardSeconds((long)5L)).withBatchInitialCount(200).withSamplePeriod(Duration.standardMinutes((long)10L)).withSamplePeriodBucketSize(Duration.standardMinutes((long)2L)).build();
        RpcQosImpl qos = new RpcQosImpl(options, this.random, this.sleeper, this.counterFactory, this.distributionFactory);
        Instant t00m00s000ms = RpcQosSimulationTest.t(Duration.ZERO);
        Instant t00m00s010ms = RpcQosSimulationTest.t(Duration.millis((long)10L));
        Instant t00m00s020ms = RpcQosSimulationTest.t(Duration.millis((long)20L));
        Instant t00m00s999ms = RpcQosSimulationTest.t(Duration.millis((long)999L));
        Instant t00m01s000ms = RpcQosSimulationTest.t(RpcQosSimulationTest.seconds(1));
        Instant t00m01s001ms = RpcQosSimulationTest.t(RpcQosSimulationTest.seconds(1), Duration.millis((long)1L));
        Instant t05m00s000ms = RpcQosSimulationTest.t(RpcQosSimulationTest.minutes(5));
        Instant t05m00s001ms = RpcQosSimulationTest.t(RpcQosSimulationTest.minutes(5), Duration.millis((long)1L));
        Instant t05m00s002ms = RpcQosSimulationTest.t(RpcQosSimulationTest.minutes(5), Duration.millis((long)2L));
        Instant t10m00s000ms = RpcQosSimulationTest.t(RpcQosSimulationTest.minutes(10));
        Instant t10m00s001ms = RpcQosSimulationTest.t(RpcQosSimulationTest.minutes(10), Duration.millis((long)1L));
        Instant t10m00s002ms = RpcQosSimulationTest.t(RpcQosSimulationTest.minutes(10), Duration.millis((long)2L));
        Instant t15m00s000ms = RpcQosSimulationTest.t(RpcQosSimulationTest.minutes(15), Duration.millis((long)1L));
        Instant t20m00s000ms = RpcQosSimulationTest.t(RpcQosSimulationTest.minutes(20));
        Instant t20m01s000ms = RpcQosSimulationTest.t(RpcQosSimulationTest.minutes(20), RpcQosSimulationTest.seconds(1));
        this.safeToProceedAndWithBudgetAndWrite(qos, t00m00s000ms, 200, 200, "write 200");
        this.safeToProceedAndWithBudgetAndWrite(qos, t00m00s010ms, 300, 300, "write 300");
        this.unsafeToProceed(qos, t00m00s020ms);
        this.unsafeToProceed(qos, t00m00s999ms);
        this.safeToProceedAndWithBudgetAndWrite(qos, t00m01s000ms, 500, 500, "wait 1 second for budget to refill, write 500");
        this.unsafeToProceed(qos, t00m01s001ms);
        this.safeToProceedAndWithBudgetAndWrite(qos, t05m00s000ms, 500, 100, "jump ahead to next ramp up interval and write 100");
        this.safeToProceedAndWithBudgetAndWrite(qos, t05m00s001ms, 400, 400, "write another 400 exhausting budget");
        this.unsafeToProceed(qos, t05m00s002ms);
        this.safeToProceedAndWithBudgetAndWrite(qos, t10m00s000ms, 500, 500, "after 10 minutes the ramp up should allow 750 writes, write 500");
        this.safeToProceedAndWithBudgetAndWrite(qos, t10m00s001ms, 250, 250, "write 250 more");
        this.unsafeToProceed(qos, t10m00s002ms);
        this.safeToProceedAndWithBudgetAndWrite(qos, t15m00s000ms, 500, 500, "after 15 minutes the ramp up should allow 1,125 writes, write 500");
        this.safeToProceedAndWithBudgetAndWrite(qos, t15m00s000ms, 500, 500, "write 500 more");
        this.safeToProceedAndWithBudgetAndWrite(qos, t15m00s000ms, 125, 125, "write 125 more");
        this.unsafeToProceed(qos, t15m00s000ms);
        this.safeToProceedAndWithBudgetAndWrite(qos, t20m00s000ms, 500, 500, "after 20 minutes the ramp up should allow 1,687 writes, write 500");
        this.safeToProceedAndWithBudgetAndWrite(qos, t20m00s000ms, 500, 500, "write 500 more");
        this.safeToProceedAndWithBudgetAndWrite(qos, t20m00s000ms, 500, 500, "write 500 more");
        this.safeToProceedAndWithBudgetAndWrite(qos, t20m00s000ms, 187, 187, "write 125 more");
        this.unsafeToProceed(qos, t20m00s000ms);
        this.safeToProceedAndWithBudgetAndWrite(qos, t20m01s000ms, 500, 500, "wait 1 second for the budget to refill, write 500");
        this.safeToProceedAndWithBudgetAndWrite(qos, t20m01s000ms, 500, 500, "write 500 more");
        this.safeToProceedAndWithBudgetAndWrite(qos, t20m01s000ms, 500, 500, "write 500 more");
        this.safeToProceedAndWithBudgetAndWrite(qos, t20m01s000ms, 187, 187, "write 125 more");
        this.unsafeToProceed(qos, t20m01s000ms);
    }

    private static Instant t(Duration d) {
        return Instant.ofEpochMilli((long)d.getMillis());
    }

    private static Instant t(Duration ... ds) {
        Duration sum = Arrays.stream(ds).reduce(Duration.ZERO, Duration::plus, Duration::plus);
        return RpcQosSimulationTest.t(sum);
    }

    private static Duration minutes(int i) {
        return Duration.standardMinutes((long)i);
    }

    private static Duration seconds(int i) {
        return Duration.standardSeconds((long)i);
    }

    private void unsafeToProceed(RpcQosImpl qos, Instant t) throws InterruptedException {
        RpcQosImpl.RpcWriteAttemptImpl attempt = qos.newWriteAttempt(this.rpcAttemptContext);
        Assert.assertFalse((String)RpcQosSimulationTest.msg("verify budget depleted", t, "awaitSafeToProceed was true, expected false"), (boolean)attempt.awaitSafeToProceed(t));
    }

    private void safeToProceedAndWithBudgetAndWrite(RpcQosImpl qos, Instant t, int expectedBatchMaxCount, int writeCount, String description) throws InterruptedException {
        RpcQosImpl.RpcWriteAttemptImpl attempt = qos.newWriteAttempt(this.rpcAttemptContext);
        Assert.assertTrue((String)RpcQosSimulationTest.msg(description, t, "awaitSafeToProceed was false, expected true"), (boolean)attempt.awaitSafeToProceed(t));
        RpcQosImpl.FlushBufferImpl buffer = attempt.newFlushBuffer(t);
        Assert.assertEquals((String)RpcQosSimulationTest.msg(description, t, "unexpected batchMaxCount"), (long)expectedBatchMaxCount, (long)buffer.nextBatchMaxCount);
        attempt.recordRequestStart(t, writeCount);
        attempt.recordWriteCounts(t, writeCount, 0);
    }

    private static String msg(String description, Instant t, String message) {
        return String.format("[%s @ t = %s] %s", description, t, message);
    }
}

