package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.protobuf.ByteString;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSinkTest.class */
public class PubsubLiteSinkTest {

    @Spy
    private PublisherFakeService publisher;
    private ApiService.Listener listener;

    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    private final PubsubLiteSink sink = new PubsubLiteSink(defaultOptions());

    @Captor
    final ArgumentCaptor<Message> publishedMessageCaptor = ArgumentCaptor.forClass(Message.class);

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSinkTest$PublisherFakeService.class */
    static abstract class PublisherFakeService extends FakeApiService implements Publisher<PublishMetadata> {
        PublisherFakeService() {
        }
    }

    private PublisherOptions defaultOptions() {
        return PublisherOptions.newBuilder().setTopicPath(TopicPath.newBuilder().setProject(ProjectNumber.of(9L)).setName(TopicName.of("abc")).setLocation(CloudZone.of(CloudRegion.of("us-east1"), 'a')).build()).build();
    }

    private void runWith(Message... messageArr) {
        this.pipeline.apply(Create.of((Iterable) Arrays.stream(messageArr).map((v0) -> {
            return v0.toProto();
        }).collect(Collectors.toList()))).apply(ParDo.of(this.sink));
        this.pipeline.run();
    }

    @Before
    public void setUp() throws Exception {
        MockitoAnnotations.initMocks(this);
        PerServerPublisherCache.PUBLISHER_CACHE.set(defaultOptions(), this.publisher);
        ((PublisherFakeService) Mockito.doAnswer(invocationOnMock -> {
            this.listener = (ApiService.Listener) invocationOnMock.getArgument(0);
            return null;
        }).when(this.publisher)).addListener((ApiService.Listener) ArgumentMatchers.any(), (Executor) ArgumentMatchers.any());
        this.sink.setup();
        ((PublisherFakeService) Mockito.verify(this.publisher)).addListener((ApiService.Listener) ArgumentMatchers.any(), (Executor) ArgumentMatchers.any());
    }

    @Test
    public void singleMessagePublishes() throws Exception {
        Mockito.when(this.publisher.publish(Message.builder().build())).thenReturn(ApiFutures.immediateFuture(PublishMetadata.of(Partition.of(1L), Offset.of(2L))));
        runWith(Message.builder().build());
        ((PublisherFakeService) Mockito.verify(this.publisher)).publish(Message.builder().build());
    }

    @Test
    public void manyMessagePublishes() throws Exception {
        Message build = Message.builder().build();
        Message build2 = Message.builder().setKey(ByteString.copyFromUtf8("abc")).build();
        Mockito.when(this.publisher.publish(build)).thenReturn(ApiFutures.immediateFuture(PublishMetadata.of(Partition.of(1L), Offset.of(2L))));
        Mockito.when(this.publisher.publish(build2)).thenReturn(ApiFutures.immediateFuture(PublishMetadata.of(Partition.of(85L), Offset.of(3L))));
        runWith(build, build2);
        ((PublisherFakeService) Mockito.verify(this.publisher, Mockito.times(2))).publish((Message) this.publishedMessageCaptor.capture());
        MatcherAssert.assertThat(this.publishedMessageCaptor.getAllValues(), Matchers.containsInAnyOrder(new Message[]{build, build2}));
    }

    @Test
    public void singleExceptionWhenProcessing() {
        Message build = Message.builder().build();
        Mockito.when(this.publisher.publish(build)).thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(StatusCode.Code.INTERNAL).underlying));
        Pipeline.PipelineExecutionException assertThrows = Assert.assertThrows(Pipeline.PipelineExecutionException.class, () -> {
            runWith(build);
        });
        ((PublisherFakeService) Mockito.verify(this.publisher)).publish(build);
        Optional extract = ExtractStatus.extract(assertThrows.getCause());
        Assert.assertTrue(extract.isPresent());
        MatcherAssert.assertThat(((CheckedApiException) extract.get()).code(), Matchers.equalTo(StatusCode.Code.INTERNAL));
    }

    @Test
    public void exceptionMixedWithOK() throws Exception {
        Message build = Message.builder().build();
        Message build2 = Message.builder().setKey(ByteString.copyFromUtf8("abc")).build();
        Message build3 = Message.builder().setKey(ByteString.copyFromUtf8("def")).build();
        SettableApiFuture create = SettableApiFuture.create();
        SettableApiFuture create2 = SettableApiFuture.create();
        SettableApiFuture create3 = SettableApiFuture.create();
        CountDownLatch countDownLatch = new CountDownLatch(3);
        Mockito.when(this.publisher.publish(build)).then(invocationOnMock -> {
            countDownLatch.countDown();
            return create;
        });
        Mockito.when(this.publisher.publish(build2)).then(invocationOnMock2 -> {
            countDownLatch.countDown();
            return create2;
        });
        Mockito.when(this.publisher.publish(build3)).then(invocationOnMock3 -> {
            countDownLatch.countDown();
            return create3;
        });
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        newCachedThreadPool.execute(() -> {
            try {
                countDownLatch.await();
                create.set(PublishMetadata.of(Partition.of(1L), Offset.of(2L)));
                create2.setException(new CheckedApiException(StatusCode.Code.INTERNAL).underlying);
                create3.set(PublishMetadata.of(Partition.of(1L), Offset.of(3L)));
            } catch (InterruptedException e) {
                Assert.fail();
                throw new RuntimeException(e);
            }
        });
        Pipeline.PipelineExecutionException assertThrows = Assert.assertThrows(Pipeline.PipelineExecutionException.class, () -> {
            runWith(build, build2, build3);
        });
        ((PublisherFakeService) Mockito.verify(this.publisher, Mockito.times(3))).publish((Message) this.publishedMessageCaptor.capture());
        MatcherAssert.assertThat(this.publishedMessageCaptor.getAllValues(), Matchers.containsInAnyOrder(new Message[]{build, build2, build3}));
        Optional extract = ExtractStatus.extract(assertThrows.getCause());
        Assert.assertTrue(extract.isPresent());
        MatcherAssert.assertThat(((CheckedApiException) extract.get()).code(), Matchers.equalTo(StatusCode.Code.INTERNAL));
        newCachedThreadPool.shutdownNow();
    }

    @Test
    public void listenerExceptionOnBundleFinish() throws Exception {
        Message build = Message.builder().build();
        SettableApiFuture create = SettableApiFuture.create();
        SettableApiFuture create2 = SettableApiFuture.create();
        Mockito.when(this.publisher.publish(build)).thenAnswer(invocationOnMock -> {
            create2.set((Object) null);
            return create;
        });
        Future<?> submit = Executors.newSingleThreadExecutor().submit(() -> {
            Optional extract = ExtractStatus.extract(Assert.assertThrows(Pipeline.PipelineExecutionException.class, () -> {
                runWith(build);
            }).getCause());
            Assert.assertTrue(extract.isPresent());
            MatcherAssert.assertThat(((CheckedApiException) extract.get()).code(), Matchers.equalTo(StatusCode.Code.INTERNAL));
        });
        create2.get();
        this.listener.failed((ApiService.State) null, new CheckedApiException(StatusCode.Code.INTERNAL).underlying);
        create.set(PublishMetadata.of(Partition.of(1L), Offset.of(2L)));
        submit.get();
    }
}
