package org.apache.beam.fn.harness.data;

import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.class */
public class QueueingBeamFnDataClientTest {

    @Rule
    public TestExecutors.TestExecutorService executor = TestExecutors.from((Supplier<ExecutorService>) Executors::newCachedThreadPool);
    private static final BeamFnApi.Elements ELEMENTS_A_1;
    private static final BeamFnApi.Elements ELEMENTS_A_2;
    private static final BeamFnApi.Elements ELEMENTS_B_1;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) QueueingBeamFnDataClientTest.class);
    private static final Coder<WindowedValue<String>> CODER = LengthPrefixCoder.of(WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE));
    private static final LogicalEndpoint ENDPOINT_A = LogicalEndpoint.data("12L", "34L");
    private static final LogicalEndpoint ENDPOINT_B = LogicalEndpoint.data("56L", "78L");

    @Test(timeout = 10000)
    public void testBasicInboundConsumerBehaviour() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(3);
        CountDownLatch countDownLatch3 = new CountDownLatch(2);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue3 = new ConcurrentLinkedQueue();
        final AtomicReference atomicReference = new AtomicReference();
        Objects.requireNonNull(concurrentLinkedQueue3);
        final CallStreamObserver build = TestStreams.withOnNext((v1) -> {
            r0.add(v1);
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + "-" + UUID.randomUUID().toString()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService((BindableService) new BeamFnDataGrpc.BeamFnDataImplBase() { // from class: org.apache.beam.fn.harness.data.QueueingBeamFnDataClientTest.1
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc.BeamFnDataImplBase
            public StreamObserver<BeamFnApi.Elements> data(StreamObserver<BeamFnApi.Elements> streamObserver) {
                atomicReference.set(streamObserver);
                countDownLatch.countDown();
                return build;
            }
        }).build();
        build3.start();
        try {
            ManagedChannel build4 = InProcessChannelBuilder.forName(build2.getUrl()).build();
            QueueingBeamFnDataClient queueingBeamFnDataClient = new QueueingBeamFnDataClient(new BeamFnDataGrpcClient(PipelineOptionsFactory.create(), apiServiceDescriptor -> {
                return build4;
            }, OutboundObserverFactory.trivial()));
            queueingBeamFnDataClient.receive(build2, ENDPOINT_A, CODER, windowedValue -> {
                concurrentLinkedQueue.add(windowedValue);
                countDownLatch2.countDown();
            });
            countDownLatch.await();
            Future<?> submit = this.executor.submit(() -> {
                ((StreamObserver) atomicReference.get()).onNext(ELEMENTS_A_1);
                ((StreamObserver) atomicReference.get()).onNext(ELEMENTS_B_1);
            });
            queueingBeamFnDataClient.receive(build2, ENDPOINT_B, CODER, windowedValue2 -> {
                concurrentLinkedQueue2.add(windowedValue2);
                countDownLatch3.countDown();
            });
            Future<?> submit2 = this.executor.submit(() -> {
                try {
                    queueingBeamFnDataClient.drainAndBlock();
                } catch (Exception e) {
                    LOG.error("Failed ", (Throwable) e);
                    Assert.fail();
                }
            });
            countDownLatch3.await();
            Assert.assertThat(concurrentLinkedQueue2, Matchers.contains(WindowedValue.valueInGlobalWindow("JKL"), WindowedValue.valueInGlobalWindow("MNO")));
            ((StreamObserver) atomicReference.get()).onNext(ELEMENTS_A_2);
            countDownLatch2.await();
            Assert.assertThat(concurrentLinkedQueue, Matchers.contains(WindowedValue.valueInGlobalWindow("ABC"), WindowedValue.valueInGlobalWindow("DEF"), WindowedValue.valueInGlobalWindow("GHI")));
            submit.get();
            submit2.get();
            build3.shutdownNow();
        } catch (Throwable th) {
            build3.shutdownNow();
            throw th;
        }
    }

    @Test(timeout = 100000)
    public void testBundleProcessorThrowsExecutionExceptionWhenUserCodeThrows() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        final AtomicReference atomicReference = new AtomicReference();
        Objects.requireNonNull(concurrentLinkedQueue2);
        final CallStreamObserver build = TestStreams.withOnNext((v1) -> {
            r0.add(v1);
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + "-" + UUID.randomUUID().toString()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService((BindableService) new BeamFnDataGrpc.BeamFnDataImplBase() { // from class: org.apache.beam.fn.harness.data.QueueingBeamFnDataClientTest.2
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc.BeamFnDataImplBase
            public StreamObserver<BeamFnApi.Elements> data(StreamObserver<BeamFnApi.Elements> streamObserver) {
                atomicReference.set(streamObserver);
                countDownLatch.countDown();
                return build;
            }
        }).build();
        build3.start();
        try {
            ManagedChannel build4 = InProcessChannelBuilder.forName(build2.getUrl()).build();
            QueueingBeamFnDataClient queueingBeamFnDataClient = new QueueingBeamFnDataClient(new BeamFnDataGrpcClient(PipelineOptionsFactory.create(), apiServiceDescriptor -> {
                return build4;
            }, OutboundObserverFactory.trivial()));
            InboundDataClient receive = queueingBeamFnDataClient.receive(build2, ENDPOINT_A, CODER, windowedValue -> {
                throw new RuntimeException("Intentionally fail!");
            });
            countDownLatch.await();
            Future<?> submit = this.executor.submit(() -> {
                ((StreamObserver) atomicReference.get()).onNext(ELEMENTS_A_1);
                ((StreamObserver) atomicReference.get()).onNext(ELEMENTS_B_1);
            });
            InboundDataClient receive2 = queueingBeamFnDataClient.receive(build2, ENDPOINT_B, CODER, windowedValue2 -> {
                concurrentLinkedQueue.add(windowedValue2);
            });
            Future<?> submit2 = this.executor.submit(() -> {
                boolean z = false;
                try {
                    queueingBeamFnDataClient.drainAndBlock();
                } catch (RuntimeException e) {
                    z = true;
                } catch (Exception e2) {
                    LOG.error("Unintentional failure", (Throwable) e2);
                    Assert.fail();
                }
                Assert.assertTrue(z);
            });
            submit.get();
            submit2.get();
            boolean z = false;
            try {
                receive.awaitCompletion();
            } catch (ExecutionException e) {
                if (e.getCause() instanceof RuntimeException) {
                    z = true;
                }
            }
            Assert.assertTrue(z);
            boolean z2 = false;
            try {
                receive2.awaitCompletion();
            } catch (ExecutionException e2) {
                if (e2.getCause() instanceof RuntimeException) {
                    z2 = true;
                }
            }
            Assert.assertTrue(z2);
            build3.shutdownNow();
        } catch (Throwable th) {
            build3.shutdownNow();
            throw th;
        }
    }

    static {
        try {
            ELEMENTS_A_1 = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(ENDPOINT_A.getInstructionId()).setTransformId(ENDPOINT_A.getTransformId()).setData(ByteString.copyFrom(CoderUtils.encodeToByteArray(CODER, WindowedValue.valueInGlobalWindow("ABC"))).concat(ByteString.copyFrom(CoderUtils.encodeToByteArray(CODER, WindowedValue.valueInGlobalWindow("DEF")))))).build();
            ELEMENTS_A_2 = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(ENDPOINT_A.getInstructionId()).setTransformId(ENDPOINT_A.getTransformId()).setData(ByteString.copyFrom(CoderUtils.encodeToByteArray(CODER, WindowedValue.valueInGlobalWindow("GHI"))))).addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(ENDPOINT_A.getInstructionId()).setTransformId(ENDPOINT_A.getTransformId())).build();
            ELEMENTS_B_1 = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(ENDPOINT_B.getInstructionId()).setTransformId(ENDPOINT_B.getTransformId()).setData(ByteString.copyFrom(CoderUtils.encodeToByteArray(CODER, WindowedValue.valueInGlobalWindow("JKL"))).concat(ByteString.copyFrom(CoderUtils.encodeToByteArray(CODER, WindowedValue.valueInGlobalWindow("MNO")))))).addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(ENDPOINT_B.getInstructionId()).setTransformId(ENDPOINT_B.getTransformId())).build();
        } catch (Exception e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}
