package com.google.cloud.bigquery.storage.v1;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.test.Test;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool;
import com.google.cloud.bigquery.storage.v1.FakeBigQueryWriteImpl;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.common.truth.Truth;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
/* loaded from: input_file:com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.class */
public class ConnectionWorkerPoolTest {
    private FakeBigQueryWrite testBigQueryWrite;
    private FakeScheduledExecutorService fakeExecutor;
    private static MockServiceHelper serviceHelper;
    private BigQueryWriteSettings clientSettings;
    private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
    private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/_default";
    private static final String TEST_STREAM_2 = "projects/p1/datasets/d1/tables/t2/streams/_default";
    private static final int MAX_RETRY_NUM_ATTEMPTS = 3;
    private static final int MAX_RETRY_DELAY_MINUTES = 5;
    private static final long INITIAL_RETRY_MILLIS = 500;
    private static final double RETRY_MULTIPLIER = 1.3d;
    private static final RetrySettings retrySettings = RetrySettings.newBuilder().setInitialRetryDelay(Duration.ofMillis(INITIAL_RETRY_MILLIS)).setRetryDelayMultiplier(RETRY_MULTIPLIER).setMaxAttempts(3).setMaxRetryDelay(Duration.ofMinutes(5)).build();

    /* loaded from: input_file:com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest$DummySupplierWillFailNTimesThenSucceed.class */
    private class DummySupplierWillFailNTimesThenSucceed implements Supplier<FakeBigQueryWriteImpl.Response> {
        private int failCount;
        private final Status.Code errorCode;
        private final String errorMessage;
        private final int successOffset;

        DummySupplierWillFailNTimesThenSucceed(int i, Status.Code code, String str, int i2) {
            this.failCount = i;
            this.errorCode = code;
            this.errorMessage = str;
            this.successOffset = i2;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public FakeBigQueryWriteImpl.Response get() {
            if (this.failCount <= 0) {
                return new FakeBigQueryWriteImpl.Response(ConnectionWorkerPoolTest.this.createAppendResponse(this.successOffset));
            }
            this.failCount--;
            return new FakeBigQueryWriteImpl.Response(ConnectionWorkerPoolTest.this.createAppendResponseWithError(this.errorCode, this.errorMessage));
        }
    }

    @Before
    public void setUp() throws Exception {
        this.testBigQueryWrite = new FakeBigQueryWrite();
        serviceHelper = new MockServiceHelper(UUID.randomUUID().toString(), Arrays.asList(this.testBigQueryWrite));
        serviceHelper.start();
        this.fakeExecutor = new FakeScheduledExecutorService();
        this.testBigQueryWrite.setExecutor(this.fakeExecutor);
        this.clientSettings = BigQueryWriteSettings.newBuilder().setCredentialsProvider(NoCredentialsProvider.create()).setTransportChannelProvider(serviceHelper.createChannelProvider()).build();
        ConnectionWorker.Load.setOverwhelmedCountsThreshold(0.5d);
        ConnectionWorker.Load.setOverwhelmedBytesThreshold(0.6d);
    }

    @Test
    public void testSingleTableConnection_noOverwhelmedConnection() throws Exception {
        testSendRequestsToMultiTable(100, 100000, 8, 1, 1);
    }

    @Test
    public void testSingleTableConnections_overwhelmed() throws Exception {
        testSendRequestsToMultiTable(100, 10, 8, 8, 1);
    }

    @Test
    public void testMultiTableConnection_noOverwhelmedConnection() throws Exception {
        testSendRequestsToMultiTable(100, 100000, 8, 2, 4);
    }

    @Test
    public void testMultiTableConnections_overwhelmed_reachingMaximum() throws Exception {
        testSendRequestsToMultiTable(100, 10, 8, 8, 4);
    }

    @Test
    public void testMultiTableConnections_overwhelmed_overTotalLimit() throws Exception {
        testSendRequestsToMultiTable(200, 10, 8, 8, 10);
    }

    @Test
    public void testMultiTableConnections_overwhelmed_notReachingMaximum() throws Exception {
        testSendRequestsToMultiTable(20, 10, 8, 4, 4);
    }

    private void testSendRequestsToMultiTable(int i, int i2, int i3, int i4, int i5) throws IOException, ExecutionException, InterruptedException {
        ConnectionWorkerPool.setOptions(ConnectionWorkerPool.Settings.builder().setMinConnectionsPerRegion(2).setMaxConnectionsPerRegion(i3).build());
        ConnectionWorkerPool createConnectionWorkerPool = createConnectionWorkerPool(i2, 100000L, java.time.Duration.ofSeconds(5L));
        this.testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
        long j = i;
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= j) {
                break;
            }
            this.testBigQueryWrite.addResponse((AbstractMessage) createAppendResponse(j3));
            j2 = j3 + 1;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i6 = 0; i6 < i5; i6++) {
            arrayList2.add(getTestStreamWriter(String.format("projects/p1/datasets/d1/tables/t%s/streams/_default", Integer.valueOf(i6))));
        }
        long j4 = 0;
        while (true) {
            long j5 = j4;
            if (j5 >= j) {
                break;
            }
            arrayList.add(sendFooStringTestMessage((StreamWriter) arrayList2.get((int) (j5 % arrayList2.size())), createConnectionWorkerPool, new String[]{String.valueOf(j5)}, j5));
            j4 = j5 + 1;
        }
        for (int i7 = 0; i7 < j; i7++) {
            Truth.assertThat(Long.valueOf(((AppendRowsResponse) ((ApiFuture) arrayList.get(i7)).get()).getAppendResult().getOffset().getValue())).isEqualTo(Integer.valueOf(i7));
        }
        Truth.assertThat(Integer.valueOf(createConnectionWorkerPool.getCreateConnectionCount())).isEqualTo(Integer.valueOf(i4));
        Truth.assertThat(Integer.valueOf(this.testBigQueryWrite.getAppendRequests().size())).isEqualTo(Long.valueOf(j));
        HashSet hashSet = new HashSet();
        for (int i8 = 0; i8 < j; i8++) {
            AppendRowsRequest appendRowsRequest = this.testBigQueryWrite.getAppendRequests().get(i8);
            Truth.assertThat(Integer.valueOf(appendRowsRequest.getProtoRows().getRows().getSerializedRowsCount())).isGreaterThan(0);
            hashSet.add(Long.valueOf(appendRowsRequest.getOffset().getValue()));
        }
        Truth.assertThat(Integer.valueOf(hashSet.size())).isEqualTo(Long.valueOf(j));
    }

    @Test
    public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
        ConnectionWorkerPool.setOptions(ConnectionWorkerPool.Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
        ConnectionWorkerPool createConnectionWorkerPool = createConnectionWorkerPool(3L, 1000L, java.time.Duration.ofSeconds(5L));
        this.testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
        StreamWriter testStreamWriter = getTestStreamWriter(TEST_STREAM_1);
        StreamWriter testStreamWriter2 = getTestStreamWriter(TEST_STREAM_2);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 20) {
                break;
            }
            this.testBigQueryWrite.addResponse((AbstractMessage) createAppendResponse(j2));
            j = j2 + 1;
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 20; i++) {
            arrayList.add(sendFooStringTestMessage(i % 2 == 0 ? testStreamWriter : testStreamWriter2, createConnectionWorkerPool, new String[]{String.valueOf(i)}, i));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((ApiFuture) it.next()).get();
        }
        Truth.assertThat(Integer.valueOf(createConnectionWorkerPool.getCreateConnectionCount())).isEqualTo(10);
        Truth.assertThat(Integer.valueOf(createConnectionWorkerPool.getTotalConnectionCount())).isEqualTo(10);
        createConnectionWorkerPool.close(testStreamWriter);
        Truth.assertThat(Integer.valueOf(createConnectionWorkerPool.getTotalConnectionCount())).isEqualTo(8);
        createConnectionWorkerPool.close(testStreamWriter2);
        Truth.assertThat(Integer.valueOf(createConnectionWorkerPool.getTotalConnectionCount())).isEqualTo(0);
    }

    @Test
    public void testMultiStreamAppend_appendWhileClosing() throws Exception {
        ConnectionWorkerPool.setOptions(ConnectionWorkerPool.Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
        ConnectionWorkerPool createConnectionWorkerPool = createConnectionWorkerPool(3L, 100000L, java.time.Duration.ofSeconds(5L));
        this.testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
        StreamWriter testStreamWriter = getTestStreamWriter(TEST_STREAM_1);
        StreamWriter testStreamWriter2 = getTestStreamWriter(TEST_STREAM_2);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                break;
            }
            this.testBigQueryWrite.addResponse((AbstractMessage) createAppendResponse(j2));
            j = j2 + 1;
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(sendFooStringTestMessage(i % 2 == 0 ? testStreamWriter : testStreamWriter2, createConnectionWorkerPool, new String[]{String.valueOf(i)}, i));
        }
        Truth.assertThat(Integer.valueOf(createConnectionWorkerPool.getCreateConnectionCount())).isEqualTo(5);
        Truth.assertThat(Integer.valueOf(createConnectionWorkerPool.getTotalConnectionCount())).isEqualTo(5);
        createConnectionWorkerPool.close(testStreamWriter);
        Truth.assertThat(Integer.valueOf(createConnectionWorkerPool.getTotalConnectionCount())).isEqualTo(3);
        Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
        for (int i2 = 0; i2 < 10; i2++) {
            arrayList.add(sendFooStringTestMessage(i2 % 2 == 0 ? testStreamWriter : testStreamWriter2, createConnectionWorkerPool, new String[]{String.valueOf(i2)}, i2));
        }
        Truth.assertThat(Integer.valueOf(createConnectionWorkerPool.getTotalConnectionCount())).isEqualTo(5);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((ApiFuture) it.next()).get();
        }
        createConnectionWorkerPool.close(testStreamWriter);
        Truth.assertThat(Integer.valueOf(createConnectionWorkerPool.getTotalConnectionCount())).isEqualTo(3);
        createConnectionWorkerPool.close(testStreamWriter2);
        Truth.assertThat(Integer.valueOf(createConnectionWorkerPool.getTotalConnectionCount())).isEqualTo(0);
    }

    @Test
    public void testCloseWhileAppending_noDeadlockHappen() throws Exception {
        ConnectionWorkerPool.setOptions(ConnectionWorkerPool.Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
        ConnectionWorkerPool createConnectionWorkerPool = createConnectionWorkerPool(1500L, 100000L, java.time.Duration.ofSeconds(5L));
        this.testBigQueryWrite.setResponseSleep(Duration.ofMillis(20L));
        StreamWriter testStreamWriter = getTestStreamWriter(TEST_STREAM_1);
        ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AsyncStreamReadThread").build()));
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                break;
            }
            this.testBigQueryWrite.addResponse((AbstractMessage) createAppendResponse(j2));
            j = j2 + 1;
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 500; i++) {
            arrayList.add(listeningDecorator.submit(() -> {
                sendFooStringTestMessage(testStreamWriter, createConnectionWorkerPool, new String[]{String.valueOf(0)}, 0L);
            }));
        }
        createConnectionWorkerPool.close(testStreamWriter);
        for (int i2 = 0; i2 < 500; i2++) {
            ((Future) arrayList.get(i2)).get();
        }
    }

    @Test
    public void testAppendWithRetry() throws Exception {
        ConnectionWorkerPool createConnectionWorkerPool = createConnectionWorkerPool(1500L, 100000L, java.time.Duration.ofSeconds(5L));
        StreamWriter testStreamWriter = getTestStreamWriter(TEST_STREAM_1);
        this.testBigQueryWrite.addResponse(new DummySupplierWillFailNTimesThenSucceed(3, Status.RESOURCE_EXHAUSTED.getCode(), "test quota error A", 0));
        this.testBigQueryWrite.addResponse(new DummySupplierWillFailNTimesThenSucceed(2, Status.RESOURCE_EXHAUSTED.getCode(), "test quota error B", 1));
        this.testBigQueryWrite.addResponse((AbstractMessage) createAppendResponse(2L));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3; i++) {
            arrayList.add(sendFooStringTestMessage(testStreamWriter, createConnectionWorkerPool, new String[]{String.valueOf(i)}, i));
        }
        for (int i2 = 0; i2 < 3; i2++) {
            ((Future) arrayList.get(i2)).get();
        }
        createConnectionWorkerPool.close(testStreamWriter);
    }

    @Test
    public void testToTableName() {
        Truth.assertThat(ConnectionWorkerPool.toTableName("projects/p/datasets/d/tables/t/streams/s")).isEqualTo("projects/p/datasets/d/tables/t");
    }

    @Test
    public void testCloseExternalClient() throws IOException, InterruptedException, ExecutionException {
        StreamWriter.clearConnectionPool();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 100 * 2) {
                break;
            }
            this.testBigQueryWrite.addResponse((AbstractMessage) createAppendResponse(j2));
            j = j2 + 1;
        }
        this.testBigQueryWrite.addResponse((AbstractMessage) WriteStream.newBuilder().setLocation("us").build());
        ArrayList arrayList = new ArrayList();
        BigQueryWriteClient create = BigQueryWriteClient.create(BigQueryWriteSettings.newBuilder().setCredentialsProvider(NoCredentialsProvider.create()).setTransportChannelProvider(serviceHelper.createChannelProvider()).build());
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList2.add(StreamWriter.newBuilder(String.format("projects/p1/datasets/d1/tables/t%s/streams/_default", Integer.valueOf(i)), create).setEnableConnectionPool(true).setWriterSchema(createProtoSchema()).setTraceId(TEST_TRACE_ID).setLocation("us").setRetrySettings(retrySettings).build());
        }
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= 100) {
                break;
            }
            arrayList.add(((StreamWriter) arrayList2.get((int) (j4 % arrayList2.size()))).append(createProtoRows(new String[]{String.valueOf(j4)}), j4));
            j3 = j4 + 1;
        }
        create.close();
        create.awaitTermination(1L, TimeUnit.MINUTES);
        long j5 = 100;
        while (true) {
            long j6 = j5;
            if (j6 >= 100 * 2) {
                break;
            }
            arrayList.add(((StreamWriter) arrayList2.get((int) (j6 % arrayList2.size()))).append(createProtoRows(new String[]{String.valueOf(j6)}), j6));
            j5 = j6 + 1;
        }
        for (int i2 = 0; i2 < 100 * 2; i2++) {
            Truth.assertThat(Long.valueOf(((AppendRowsResponse) ((ApiFuture) arrayList.get(i2)).get()).getAppendResult().getOffset().getValue())).isEqualTo(Integer.valueOf(i2));
        }
        Truth.assertThat(Integer.valueOf(this.testBigQueryWrite.getAppendRequests().size())).isEqualTo(Long.valueOf(100 * 2));
        for (int i3 = 0; i3 < arrayList2.size(); i3++) {
            ((StreamWriter) arrayList2.get(i3)).close();
        }
        StreamWriter.clearConnectionPool();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public AppendRowsResponse createAppendResponse(long j) {
        return AppendRowsResponse.newBuilder().setAppendResult(AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(j)).build()).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public AppendRowsResponse createAppendResponseWithError(Status.Code code, String str) {
        return AppendRowsResponse.newBuilder().setError(com.google.rpc.Status.newBuilder().setCode(code.value()).setMessage(str)).build();
    }

    private StreamWriter getTestStreamWriter(String str) throws IOException {
        return StreamWriter.newBuilder(str).setWriterSchema(createProtoSchema()).setTraceId(TEST_TRACE_ID).setLocation("us").setCredentialsProvider(NoCredentialsProvider.create()).setChannelProvider(serviceHelper.createChannelProvider()).build();
    }

    private ProtoSchema createProtoSchema() {
        return ProtoSchema.newBuilder().setProtoDescriptor(DescriptorProtos.DescriptorProto.newBuilder().setName("Message").addField(DescriptorProtos.FieldDescriptorProto.newBuilder().setName("foo").setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING).setNumber(1).build()).build()).build();
    }

    private ApiFuture<AppendRowsResponse> sendFooStringTestMessage(StreamWriter streamWriter, ConnectionWorkerPool connectionWorkerPool, String[] strArr, long j) {
        return connectionWorkerPool.append(streamWriter, createProtoRows(strArr), j, "request_" + j);
    }

    private ProtoRows createProtoRows(String[] strArr) {
        ProtoRows.Builder newBuilder = ProtoRows.newBuilder();
        for (String str : strArr) {
            newBuilder.addSerializedRows(Test.FooType.newBuilder().setFoo(str).build().toByteString());
        }
        return newBuilder.build();
    }

    ConnectionWorkerPool createConnectionWorkerPool(long j, long j2, java.time.Duration duration) {
        ConnectionWorkerPool.enableTestingLogic();
        return new ConnectionWorkerPool(j, j2, duration, FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, (String) null, this.clientSettings, retrySettings, false, false);
    }
}
