/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.protobuf.ByteString;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.PerServerPublisherCache;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.PubsubLiteSink;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class PubsubLiteSinkTest {
    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    @Spy
    private PublisherFakeService publisher;
    private final PubsubLiteSink sink = new PubsubLiteSink(this.defaultOptions());
    @Captor
    final ArgumentCaptor<Message> publishedMessageCaptor = ArgumentCaptor.forClass(Message.class);

    private PublisherOptions defaultOptions() {
        return PublisherOptions.newBuilder().setTopicPath(((TopicPath.Builder)((TopicPath.Builder)TopicPath.newBuilder().setProject(ProjectNumber.of((long)9L))).setName(TopicName.of((String)"abc")).setLocation(CloudZone.of((CloudRegion)CloudRegion.of((String)"us-east1"), (char)'a'))).build()).build();
    }

    private void runWith(Message ... messages) {
        ((PCollection)this.pipeline.apply((PTransform)Create.of((Iterable)Arrays.stream(messages).map(Message::toProto).collect(Collectors.toList())))).apply((PTransform)ParDo.of((DoFn)this.sink));
        this.pipeline.run();
    }

    @Before
    public void setUp() throws Exception {
        MockitoAnnotations.initMocks((Object)this);
        PerServerPublisherCache.PUBLISHER_CACHE.set(this.defaultOptions(), (Publisher)this.publisher);
    }

    @Test
    public void singleMessagePublishes() throws Exception {
        Mockito.when((Object)this.publisher.publish(Message.builder().build())).thenReturn((Object)ApiFutures.immediateFuture((Object)MessageMetadata.of((Partition)Partition.of((long)1L), (Offset)Offset.of((long)2L))));
        this.runWith(Message.builder().build());
        ((PublisherFakeService)((Object)Mockito.verify((Object)((Object)this.publisher)))).publish(Message.builder().build());
    }

    @Test
    public void manyMessagePublishes() throws Exception {
        Message message1 = Message.builder().build();
        Message message2 = Message.builder().setKey(ByteString.copyFromUtf8((String)"abc")).build();
        Mockito.when((Object)this.publisher.publish(message1)).thenReturn((Object)ApiFutures.immediateFuture((Object)MessageMetadata.of((Partition)Partition.of((long)1L), (Offset)Offset.of((long)2L))));
        Mockito.when((Object)this.publisher.publish(message2)).thenReturn((Object)ApiFutures.immediateFuture((Object)MessageMetadata.of((Partition)Partition.of((long)85L), (Offset)Offset.of((long)3L))));
        this.runWith(message1, message2);
        ((PublisherFakeService)((Object)Mockito.verify((Object)((Object)this.publisher), (VerificationMode)Mockito.times((int)2)))).publish((Message)this.publishedMessageCaptor.capture());
        MatcherAssert.assertThat((Object)this.publishedMessageCaptor.getAllValues(), (Matcher)Matchers.containsInAnyOrder((Object[])new Message[]{message1, message2}));
    }

    @Test
    public void singleExceptionWhenProcessing() {
        Message message1 = Message.builder().build();
        Mockito.when((Object)this.publisher.publish(message1)).thenReturn((Object)ApiFutures.immediateFailedFuture((Throwable)new CheckedApiException((StatusCode.Code)StatusCode.Code.INTERNAL).underlying));
        Pipeline.PipelineExecutionException e = (Pipeline.PipelineExecutionException)Assert.assertThrows(Pipeline.PipelineExecutionException.class, () -> this.runWith(message1));
        ((PublisherFakeService)((Object)Mockito.verify((Object)((Object)this.publisher)))).publish(message1);
        Optional statusOr = ExtractStatus.extract((Throwable)e.getCause());
        Assert.assertTrue((boolean)statusOr.isPresent());
        MatcherAssert.assertThat((Object)((CheckedApiException)statusOr.get()).code(), (Matcher)Matchers.equalTo((Object)StatusCode.Code.INTERNAL));
    }

    @Test
    public void exceptionMixedWithOK() throws Exception {
        Message message1 = Message.builder().build();
        Message message2 = Message.builder().setKey(ByteString.copyFromUtf8((String)"abc")).build();
        Message message3 = Message.builder().setKey(ByteString.copyFromUtf8((String)"def")).build();
        SettableApiFuture future1 = SettableApiFuture.create();
        SettableApiFuture future2 = SettableApiFuture.create();
        SettableApiFuture future3 = SettableApiFuture.create();
        CountDownLatch startedLatch = new CountDownLatch(3);
        Mockito.when((Object)this.publisher.publish(message1)).then(invocation -> {
            startedLatch.countDown();
            return future1;
        });
        Mockito.when((Object)this.publisher.publish(message2)).then(invocation -> {
            startedLatch.countDown();
            return future2;
        });
        Mockito.when((Object)this.publisher.publish(message3)).then(invocation -> {
            startedLatch.countDown();
            return future3;
        });
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(() -> {
            try {
                startedLatch.await();
                future1.set((Object)MessageMetadata.of((Partition)Partition.of((long)1L), (Offset)Offset.of((long)2L)));
                future2.setException((Throwable)new CheckedApiException((StatusCode.Code)StatusCode.Code.INTERNAL).underlying);
                future3.set((Object)MessageMetadata.of((Partition)Partition.of((long)1L), (Offset)Offset.of((long)3L)));
            }
            catch (InterruptedException e) {
                Assert.fail();
                throw new RuntimeException(e);
            }
        });
        Pipeline.PipelineExecutionException e = (Pipeline.PipelineExecutionException)Assert.assertThrows(Pipeline.PipelineExecutionException.class, () -> this.runWith(message1, message2, message3));
        ((PublisherFakeService)((Object)Mockito.verify((Object)((Object)this.publisher), (VerificationMode)Mockito.times((int)3)))).publish((Message)this.publishedMessageCaptor.capture());
        MatcherAssert.assertThat((Object)this.publishedMessageCaptor.getAllValues(), (Matcher)Matchers.containsInAnyOrder((Object[])new Message[]{message1, message2, message3}));
        Optional statusOr = ExtractStatus.extract((Throwable)e.getCause());
        Assert.assertTrue((boolean)statusOr.isPresent());
        MatcherAssert.assertThat((Object)((CheckedApiException)statusOr.get()).code(), (Matcher)Matchers.equalTo((Object)StatusCode.Code.INTERNAL));
        exec.shutdownNow();
    }

    static abstract class PublisherFakeService
    extends FakeApiService
    implements Publisher<MessageMetadata> {
        PublisherFakeService() {
        }
    }
}

