package com.google.cloud.bigquery.storage.v1beta2;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.bigquery.storage.test.Test;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.ProtoRows;
import com.google.common.base.Strings;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.function.ThrowingRunnable;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
/* loaded from: input_file:com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.class */
public class StreamWriterV2Test {
    private static final Logger log = Logger.getLogger(StreamWriterV2Test.class.getName());
    private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
    private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
    private FakeScheduledExecutorService fakeExecutor;
    private FakeBigQueryWrite testBigQueryWrite;
    private static MockServiceHelper serviceHelper;
    private BigQueryWriteClient client;

    @Before
    public void setUp() throws Exception {
        this.testBigQueryWrite = new FakeBigQueryWrite();
        serviceHelper = new MockServiceHelper(UUID.randomUUID().toString(), Arrays.asList(this.testBigQueryWrite));
        serviceHelper.start();
        this.fakeExecutor = new FakeScheduledExecutorService();
        this.testBigQueryWrite.setExecutor(this.fakeExecutor);
        this.client = BigQueryWriteClient.create(BigQueryWriteSettings.newBuilder().setCredentialsProvider(NoCredentialsProvider.create()).setTransportChannelProvider(serviceHelper.createChannelProvider()).build());
    }

    @After
    public void tearDown() throws Exception {
        log.info("tearDown called");
        this.client.close();
        serviceHelper.stop();
    }

    private StreamWriterV2 getTestStreamWriterV2() throws IOException {
        return StreamWriterV2.newBuilder(TEST_STREAM, this.client).setTraceId(TEST_TRACE_ID).build();
    }

    private ProtoSchema createProtoSchema() {
        return ProtoSchema.newBuilder().setProtoDescriptor(DescriptorProtos.DescriptorProto.newBuilder().setName("Message").addField(DescriptorProtos.FieldDescriptorProto.newBuilder().setName("foo").setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING).setNumber(1).build()).build()).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ProtoRows createProtoRows(String[] strArr) {
        ProtoRows.Builder newBuilder = ProtoRows.newBuilder();
        for (String str : strArr) {
            newBuilder.addSerializedRows(Test.FooType.newBuilder().setFoo(str).build().toByteString());
        }
        return newBuilder.build();
    }

    private AppendRowsRequest createAppendRequest(String[] strArr, long j) {
        AppendRowsRequest.Builder newBuilder = AppendRowsRequest.newBuilder();
        AppendRowsRequest.ProtoData.Builder newBuilder2 = AppendRowsRequest.ProtoData.newBuilder();
        newBuilder2.setWriterSchema(createProtoSchema());
        if (j > 0) {
            newBuilder.setOffset(Int64Value.of(j));
        }
        return newBuilder.setProtoRows(newBuilder2.setRows(createProtoRows(strArr)).build()).setWriteStream(TEST_STREAM).build();
    }

    private AppendRowsResponse createAppendResponse(long j) {
        return AppendRowsResponse.newBuilder().setAppendResult(AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(j)).build()).build();
    }

    private AppendRowsResponse createAppendResponseWithError(Status.Code code, String str) {
        return AppendRowsResponse.newBuilder().setError(com.google.rpc.Status.newBuilder().setCode(code.value()).setMessage(str)).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ApiFuture<AppendRowsResponse> sendTestMessage(StreamWriterV2 streamWriterV2, String[] strArr) {
        return streamWriterV2.append(createAppendRequest(strArr, -1L));
    }

    private static <T extends Throwable> T assertFutureException(Class<T> cls, final Future<?> future) {
        return (T) Assert.assertThrows(cls, new ThrowingRunnable() { // from class: com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2Test.1
            public void run() throws Throwable {
                try {
                    future.get();
                } catch (ExecutionException e) {
                    throw e.getCause();
                }
            }
        });
    }

    private void verifyAppendIsBlocked(final StreamWriterV2 streamWriterV2) throws Exception {
        Thread thread = new Thread(new Runnable() { // from class: com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2Test.2
            @Override // java.lang.Runnable
            public void run() {
                StreamWriterV2Test.this.sendTestMessage(streamWriterV2, new String[]{"A"});
            }
        });
        thread.start();
        TimeUnit.SECONDS.sleep(2L);
        Assert.assertTrue(thread.isAlive());
        thread.interrupt();
    }

    private void verifyAppendRequests(long j) {
        Assert.assertEquals(j, this.testBigQueryWrite.getAppendRequests().size());
        for (int i = 0; i < j; i++) {
            AppendRowsRequest appendRowsRequest = this.testBigQueryWrite.getAppendRequests().get(i);
            Assert.assertTrue(appendRowsRequest.getProtoRows().getRows().getSerializedRowsCount() > 0);
            Assert.assertEquals(i, appendRowsRequest.getOffset().getValue());
            if (i == 0) {
                Assert.assertTrue(appendRowsRequest.getProtoRows().hasWriterSchema());
                Assert.assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM);
                Assert.assertEquals(appendRowsRequest.getTraceId(), TEST_TRACE_ID);
            } else {
                Assert.assertFalse(appendRowsRequest.getProtoRows().hasWriterSchema());
                Assert.assertEquals(appendRowsRequest.getWriteStream(), "");
                Assert.assertEquals(appendRowsRequest.getTraceId(), "");
            }
        }
    }

    @org.junit.Test
    public void testBuildBigQueryWriteClientInWriter() throws Exception {
        StreamWriterV2 build = StreamWriterV2.newBuilder(TEST_STREAM).setCredentialsProvider(NoCredentialsProvider.create()).setChannelProvider(serviceHelper.createChannelProvider()).build();
        this.testBigQueryWrite.addResponse(createAppendResponse(0L));
        Assert.assertEquals(0L, ((AppendRowsResponse) sendTestMessage(build, new String[]{"A"}).get()).getAppendResult().getOffset().getValue());
        build.close();
    }

    @org.junit.Test
    public void testAppendWithRowsSuccess() throws Exception {
        StreamWriterV2 build = StreamWriterV2.newBuilder(TEST_STREAM, this.client).setWriterSchema(createProtoSchema()).setTraceId(TEST_TRACE_ID).build();
        for (int i = 0; i < 100; i++) {
            this.testBigQueryWrite.addResponse(createAppendResponse(i));
        }
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 100; i2++) {
            arrayList.add(build.append(createProtoRows(new String[]{String.valueOf(i2)}), i2));
        }
        for (int i3 = 0; i3 < 100; i3++) {
            Assert.assertEquals(i3, ((AppendRowsResponse) ((ApiFuture) arrayList.get(i3)).get()).getAppendResult().getOffset().getValue());
        }
        verifyAppendRequests(100L);
        build.close();
    }

    @org.junit.Test
    public void testAppendWithMessageSuccess() throws Exception {
        StreamWriterV2 testStreamWriterV2 = getTestStreamWriterV2();
        for (int i = 0; i < 1000; i++) {
            this.testBigQueryWrite.addResponse(createAppendResponse(i));
        }
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 1000; i2++) {
            arrayList.add(testStreamWriterV2.append(createAppendRequest(new String[]{String.valueOf(i2)}, i2)));
        }
        for (int i3 = 0; i3 < 1000; i3++) {
            Assert.assertEquals(i3, ((AppendRowsResponse) ((ApiFuture) arrayList.get(i3)).get()).getAppendResult().getOffset().getValue());
        }
        verifyAppendRequests(1000L);
        testStreamWriterV2.close();
    }

    @org.junit.Test
    public void testAppendWithRowsNoSchema() throws Exception {
        final StreamWriterV2 testStreamWriterV2 = getTestStreamWriterV2();
        StatusRuntimeException assertThrows = Assert.assertThrows(StatusRuntimeException.class, new ThrowingRunnable() { // from class: com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2Test.3
            public void run() throws Throwable {
                testStreamWriterV2.append(StreamWriterV2Test.this.createProtoRows(new String[]{"A"}), -1L);
            }
        });
        Assert.assertEquals(assertThrows.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
        Assert.assertTrue(assertThrows.getStatus().getDescription().contains("Writer schema must be provided"));
    }

    @org.junit.Test
    public void testInvalidTraceId() throws Exception {
        Assert.assertThrows(IllegalArgumentException.class, new ThrowingRunnable() { // from class: com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2Test.4
            public void run() throws Throwable {
                StreamWriterV2.newBuilder(StreamWriterV2Test.TEST_STREAM).setTraceId("abc");
            }
        });
        Assert.assertThrows(IllegalArgumentException.class, new ThrowingRunnable() { // from class: com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2Test.5
            public void run() throws Throwable {
                StreamWriterV2.newBuilder(StreamWriterV2Test.TEST_STREAM).setTraceId("abc:");
            }
        });
        Assert.assertThrows(IllegalArgumentException.class, new ThrowingRunnable() { // from class: com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2Test.6
            public void run() throws Throwable {
                StreamWriterV2.newBuilder(StreamWriterV2Test.TEST_STREAM).setTraceId(":abc");
            }
        });
    }

    @org.junit.Test
    public void testAppendSuccessAndConnectionError() throws Exception {
        StreamWriterV2 testStreamWriterV2 = getTestStreamWriterV2();
        this.testBigQueryWrite.addResponse(createAppendResponse(0L));
        this.testBigQueryWrite.addException(Status.INTERNAL.asException());
        ApiFuture<AppendRowsResponse> sendTestMessage = sendTestMessage(testStreamWriterV2, new String[]{"A"});
        ApiFuture<AppendRowsResponse> sendTestMessage2 = sendTestMessage(testStreamWriterV2, new String[]{"B"});
        Assert.assertEquals(0L, ((AppendRowsResponse) sendTestMessage.get()).getAppendResult().getOffset().getValue());
        Assert.assertEquals(StatusCode.Code.INTERNAL, assertFutureException(ApiException.class, sendTestMessage2).getStatusCode().getCode());
        testStreamWriterV2.close();
    }

    @org.junit.Test
    public void testAppendSuccessAndInStreamError() throws Exception {
        StreamWriterV2 testStreamWriterV2 = getTestStreamWriterV2();
        this.testBigQueryWrite.addResponse(createAppendResponse(0L));
        this.testBigQueryWrite.addResponse(createAppendResponseWithError(Status.INVALID_ARGUMENT.getCode(), "test message"));
        this.testBigQueryWrite.addResponse(createAppendResponse(1L));
        ApiFuture<AppendRowsResponse> sendTestMessage = sendTestMessage(testStreamWriterV2, new String[]{"A"});
        ApiFuture<AppendRowsResponse> sendTestMessage2 = sendTestMessage(testStreamWriterV2, new String[]{"B"});
        ApiFuture<AppendRowsResponse> sendTestMessage3 = sendTestMessage(testStreamWriterV2, new String[]{"C"});
        Assert.assertEquals(0L, ((AppendRowsResponse) sendTestMessage.get()).getAppendResult().getOffset().getValue());
        StatusRuntimeException assertFutureException = assertFutureException(StatusRuntimeException.class, sendTestMessage2);
        Assert.assertEquals(Status.Code.INVALID_ARGUMENT, assertFutureException.getStatus().getCode());
        Assert.assertEquals("test message", assertFutureException.getStatus().getDescription());
        Assert.assertEquals(1L, ((AppendRowsResponse) sendTestMessage3.get()).getAppendResult().getOffset().getValue());
        testStreamWriterV2.close();
    }

    @org.junit.Test
    public void longIdleBetweenAppends() throws Exception {
        StreamWriterV2 testStreamWriterV2 = getTestStreamWriterV2();
        this.testBigQueryWrite.addResponse(createAppendResponse(0L));
        this.testBigQueryWrite.addResponse(createAppendResponse(1L));
        Assert.assertEquals(0L, ((AppendRowsResponse) sendTestMessage(testStreamWriterV2, new String[]{"A"}).get()).getAppendResult().getOffset().getValue());
        TimeUnit.SECONDS.sleep(3L);
        Assert.assertEquals(1L, ((AppendRowsResponse) sendTestMessage(testStreamWriterV2, new String[]{"B"}).get()).getAppendResult().getOffset().getValue());
        testStreamWriterV2.close();
    }

    @org.junit.Test
    public void testAppendAfterUserClose() throws Exception {
        StreamWriterV2 testStreamWriterV2 = getTestStreamWriterV2();
        this.testBigQueryWrite.addResponse(createAppendResponse(0L));
        ApiFuture<AppendRowsResponse> sendTestMessage = sendTestMessage(testStreamWriterV2, new String[]{"A"});
        testStreamWriterV2.close();
        ApiFuture<AppendRowsResponse> sendTestMessage2 = sendTestMessage(testStreamWriterV2, new String[]{"B"});
        Assert.assertEquals(0L, ((AppendRowsResponse) sendTestMessage.get()).getAppendResult().getOffset().getValue());
        Assert.assertTrue(sendTestMessage2.isDone());
        Assert.assertEquals(Status.Code.FAILED_PRECONDITION, assertFutureException(StatusRuntimeException.class, sendTestMessage2).getStatus().getCode());
    }

    @org.junit.Test
    public void testAppendAfterServerClose() throws Exception {
        StreamWriterV2 testStreamWriterV2 = getTestStreamWriterV2();
        this.testBigQueryWrite.addException(Status.INTERNAL.asException());
        Assert.assertEquals(StatusCode.Code.INTERNAL, assertFutureException(ApiException.class, sendTestMessage(testStreamWriterV2, new String[]{"A"})).getStatusCode().getCode());
        ApiFuture<AppendRowsResponse> sendTestMessage = sendTestMessage(testStreamWriterV2, new String[]{"B"});
        Assert.assertTrue(sendTestMessage.isDone());
        Assert.assertEquals(Status.Code.FAILED_PRECONDITION, assertFutureException(StatusRuntimeException.class, sendTestMessage).getStatus().getCode());
        testStreamWriterV2.close();
    }

    @org.junit.Test
    public void userCloseWhileRequestInflight() throws Exception {
        final StreamWriterV2 testStreamWriterV2 = getTestStreamWriterV2();
        this.testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2L));
        this.testBigQueryWrite.addResponse(createAppendResponse(0L));
        final ApiFuture<AppendRowsResponse> sendTestMessage = sendTestMessage(testStreamWriterV2, new String[]{"A"});
        Thread thread = new Thread(new Runnable() { // from class: com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2Test.7
            @Override // java.lang.Runnable
            public void run() {
                testStreamWriterV2.close();
            }
        });
        thread.start();
        Assert.assertThrows(TimeoutException.class, new ThrowingRunnable() { // from class: com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2Test.8
            public void run() throws Throwable {
                sendTestMessage.get(1L, TimeUnit.SECONDS);
            }
        });
        thread.join(2000L);
        Assert.assertTrue(sendTestMessage.isDone());
        Assert.assertEquals(0L, ((AppendRowsResponse) sendTestMessage.get()).getAppendResult().getOffset().getValue());
    }

    @org.junit.Test
    public void serverCloseWhileRequestsInflight() throws Exception {
        StreamWriterV2 testStreamWriterV2 = getTestStreamWriterV2();
        this.testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2L));
        this.testBigQueryWrite.addException(Status.INTERNAL.asException());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(sendTestMessage(testStreamWriterV2, new String[]{String.valueOf(i)}));
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals(StatusCode.Code.INTERNAL, assertFutureException(ApiException.class, (Future) arrayList.get(i2)).getStatusCode().getCode());
        }
        testStreamWriterV2.close();
    }

    @org.junit.Test
    public void testZeroMaxInflightRequests() throws Exception {
        StreamWriterV2 build = StreamWriterV2.newBuilder(TEST_STREAM, this.client).setMaxInflightRequests(0L).build();
        this.testBigQueryWrite.addResponse(createAppendResponse(0L));
        verifyAppendIsBlocked(build);
        build.close();
    }

    @org.junit.Test
    public void testZeroMaxInflightBytes() throws Exception {
        StreamWriterV2 build = StreamWriterV2.newBuilder(TEST_STREAM, this.client).setMaxInflightBytes(0L).build();
        this.testBigQueryWrite.addResponse(createAppendResponse(0L));
        verifyAppendIsBlocked(build);
        build.close();
    }

    @org.junit.Test
    public void testOneMaxInflightRequests() throws Exception {
        StreamWriterV2 build = StreamWriterV2.newBuilder(TEST_STREAM, this.client).setMaxInflightRequests(1L).build();
        this.testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1L));
        this.testBigQueryWrite.addResponse(createAppendResponse(0L));
        long currentTimeMillis = System.currentTimeMillis();
        ApiFuture<AppendRowsResponse> sendTestMessage = sendTestMessage(build, new String[]{"A"});
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 1000);
        Assert.assertEquals(0L, ((AppendRowsResponse) sendTestMessage.get()).getAppendResult().getOffset().getValue());
        build.close();
    }

    @org.junit.Test
    public void testAppendsWithTinyMaxInflightBytes() throws Exception {
        StreamWriterV2 build = StreamWriterV2.newBuilder(TEST_STREAM, this.client).setMaxInflightBytes(1L).build();
        this.testBigQueryWrite.setResponseSleep(Duration.ofMillis(100L));
        for (int i = 0; i < 10; i++) {
            this.testBigQueryWrite.addResponse(createAppendResponse(i));
        }
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis();
        for (int i2 = 0; i2 < 10; i2++) {
            arrayList.add(build.append(createAppendRequest(new String[]{String.valueOf(i2)}, i2)));
        }
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 1000);
        for (int i3 = 0; i3 < 10; i3++) {
            Assert.assertEquals(i3, ((AppendRowsResponse) ((ApiFuture) arrayList.get(i3)).get()).getAppendResult().getOffset().getValue());
        }
        Assert.assertEquals(10L, this.testBigQueryWrite.getAppendRequests().size());
        for (int i4 = 0; i4 < 10; i4++) {
            Assert.assertEquals(i4, this.testBigQueryWrite.getAppendRequests().get(i4).getOffset().getValue());
        }
        build.close();
    }

    @org.junit.Test
    public void testMessageTooLarge() throws Exception {
        StreamWriterV2 testStreamWriterV2 = getTestStreamWriterV2();
        ApiFuture<AppendRowsResponse> sendTestMessage = sendTestMessage(testStreamWriterV2, new String[]{Strings.repeat("a", (int) (StreamWriterV2.getApiMaxRequestBytes() + 1))});
        Assert.assertTrue(sendTestMessage.isDone());
        StatusRuntimeException assertFutureException = assertFutureException(StatusRuntimeException.class, sendTestMessage);
        Assert.assertEquals(Status.Code.INVALID_ARGUMENT, assertFutureException.getStatus().getCode());
        Assert.assertTrue(assertFutureException.getStatus().getDescription().contains("MessageSize is too large"));
        testStreamWriterV2.close();
    }
}
