package org.apache.beam.fn.harness.data;

import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.DataEndpoint;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.class */
public class BeamFnDataGrpcClientTest {
    private static final BeamFnApi.Elements ELEMENTS_A_1;
    private static final BeamFnApi.Elements ELEMENTS_A_2;
    private static final BeamFnApi.Elements ELEMENTS_B_1;
    private static final Coder<WindowedValue<String>> CODER = LengthPrefixCoder.of(WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE));
    private static final String INSTRUCTION_ID_A = "12L";
    private static final String TRANSFORM_ID_A = "34L";
    private static final LogicalEndpoint ENDPOINT_A = LogicalEndpoint.data(INSTRUCTION_ID_A, TRANSFORM_ID_A);
    private static final String INSTRUCTION_ID_B = "56L";
    private static final String TRANSFORM_ID_B = "78L";
    private static final LogicalEndpoint ENDPOINT_B = LogicalEndpoint.data(INSTRUCTION_ID_B, TRANSFORM_ID_B);

    @Test
    public void testForInboundConsumer() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue3 = new ConcurrentLinkedQueue();
        final AtomicReference atomicReference = new AtomicReference();
        Objects.requireNonNull(concurrentLinkedQueue3);
        final CallStreamObserver build = TestStreams.withOnNext((v1) -> {
            r0.add(v1);
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + "-" + UUID.randomUUID()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService((BindableService) new BeamFnDataGrpc.BeamFnDataImplBase() { // from class: org.apache.beam.fn.harness.data.BeamFnDataGrpcClientTest.1
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc.BeamFnDataImplBase
            public StreamObserver<BeamFnApi.Elements> data(StreamObserver<BeamFnApi.Elements> streamObserver) {
                atomicReference.set(streamObserver);
                countDownLatch.countDown();
                return build;
            }
        }).build();
        build3.start();
        try {
            ManagedChannel build4 = InProcessChannelBuilder.forName(build2.getUrl()).build();
            BeamFnDataGrpcClient beamFnDataGrpcClient = new BeamFnDataGrpcClient(PipelineOptionsFactory.create(), apiServiceDescriptor -> {
                return build4;
            }, OutboundObserverFactory.trivial());
            Coder<WindowedValue<String>> coder = CODER;
            Objects.requireNonNull(concurrentLinkedQueue);
            BeamFnDataInboundObserver2 forConsumers = BeamFnDataInboundObserver2.forConsumers(Arrays.asList(DataEndpoint.create(TRANSFORM_ID_A, coder, (v1) -> {
                r5.add(v1);
            })), Collections.emptyList());
            Coder<WindowedValue<String>> coder2 = CODER;
            Objects.requireNonNull(concurrentLinkedQueue2);
            BeamFnDataInboundObserver2 forConsumers2 = BeamFnDataInboundObserver2.forConsumers(Arrays.asList(DataEndpoint.create(TRANSFORM_ID_B, coder2, (v1) -> {
                r5.add(v1);
            })), Collections.emptyList());
            beamFnDataGrpcClient.registerReceiver(INSTRUCTION_ID_A, Arrays.asList(build2), forConsumers);
            countDownLatch.await();
            ((StreamObserver) atomicReference.get()).onNext(ELEMENTS_A_1);
            ((StreamObserver) atomicReference.get()).onNext(ELEMENTS_B_1);
            Thread.sleep(100L);
            beamFnDataGrpcClient.registerReceiver(INSTRUCTION_ID_B, Arrays.asList(build2), forConsumers2);
            forConsumers2.awaitCompletion();
            MatcherAssert.assertThat(concurrentLinkedQueue2, (Matcher<? super ConcurrentLinkedQueue>) Matchers.contains(WindowedValue.valueInGlobalWindow("JKL"), WindowedValue.valueInGlobalWindow("MNO")));
            ((StreamObserver) atomicReference.get()).onNext(ELEMENTS_A_2);
            forConsumers.awaitCompletion();
            MatcherAssert.assertThat(concurrentLinkedQueue, (Matcher<? super ConcurrentLinkedQueue>) Matchers.contains(WindowedValue.valueInGlobalWindow("ABC"), WindowedValue.valueInGlobalWindow("DEF"), WindowedValue.valueInGlobalWindow("GHI")));
            build3.shutdownNow();
        } catch (Throwable th) {
            build3.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testForInboundConsumerThatThrows() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final AtomicReference atomicReference = new AtomicReference();
        Objects.requireNonNull(concurrentLinkedQueue);
        final CallStreamObserver build = TestStreams.withOnNext((v1) -> {
            r0.add(v1);
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + "-" + UUID.randomUUID()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService((BindableService) new BeamFnDataGrpc.BeamFnDataImplBase() { // from class: org.apache.beam.fn.harness.data.BeamFnDataGrpcClientTest.2
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc.BeamFnDataImplBase
            public StreamObserver<BeamFnApi.Elements> data(StreamObserver<BeamFnApi.Elements> streamObserver) {
                atomicReference.set(streamObserver);
                countDownLatch.countDown();
                return build;
            }
        }).build();
        build3.start();
        RuntimeException runtimeException = new RuntimeException("TestFailure");
        try {
            ManagedChannel build4 = InProcessChannelBuilder.forName(build2.getUrl()).build();
            BeamFnDataGrpcClient beamFnDataGrpcClient = new BeamFnDataGrpcClient(PipelineOptionsFactory.create(), apiServiceDescriptor -> {
                return build4;
            }, OutboundObserverFactory.trivial());
            BeamFnDataInboundObserver2 forConsumers = BeamFnDataInboundObserver2.forConsumers(Arrays.asList(DataEndpoint.create(TRANSFORM_ID_A, CODER, windowedValue -> {
                atomicInteger.incrementAndGet();
                throw runtimeException;
            })), Collections.emptyList());
            beamFnDataGrpcClient.registerReceiver(INSTRUCTION_ID_A, Arrays.asList(build2), forConsumers);
            countDownLatch.await();
            ((StreamObserver) atomicReference.get()).onNext(ELEMENTS_A_1);
            ((StreamObserver) atomicReference.get()).onNext(ELEMENTS_A_2);
            try {
                forConsumers.awaitCompletion();
                Assert.fail("Expected channel to fail");
            } catch (Exception e) {
                Assert.assertEquals(runtimeException, e);
            }
            MatcherAssert.assertThat(concurrentLinkedQueue, (Matcher<? super ConcurrentLinkedQueue>) Matchers.empty());
            Assert.assertEquals(1L, atomicInteger.get());
            build3.shutdownNow();
        } catch (Throwable th) {
            build3.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testForOutboundConsumer() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final CallStreamObserver build = TestStreams.withOnNext(elements -> {
            concurrentLinkedQueue.add(elements);
            countDownLatch.countDown();
        }).build();
        Endpoints.ApiServiceDescriptor build2 = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getClass().getName() + "-" + UUID.randomUUID()).build();
        Server build3 = InProcessServerBuilder.forName(build2.getUrl()).addService((BindableService) new BeamFnDataGrpc.BeamFnDataImplBase() { // from class: org.apache.beam.fn.harness.data.BeamFnDataGrpcClientTest.3
            @Override // org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc.BeamFnDataImplBase
            public StreamObserver<BeamFnApi.Elements> data(StreamObserver<BeamFnApi.Elements> streamObserver) {
                return build;
            }
        }).build();
        build3.start();
        try {
            ManagedChannel build4 = InProcessChannelBuilder.forName(build2.getUrl()).build();
            BeamFnDataOutboundAggregator createOutboundAggregator = new BeamFnDataGrpcClient(PipelineOptionsFactory.fromArgs("--experiments=data_buffer_size_limit=20").create(), apiServiceDescriptor -> {
                return build4;
            }, OutboundObserverFactory.trivial()).createOutboundAggregator(build2, () -> {
                return INSTRUCTION_ID_A;
            }, false);
            FnDataReceiver registerOutputDataLocation = createOutboundAggregator.registerOutputDataLocation(TRANSFORM_ID_A, CODER);
            registerOutputDataLocation.accept(WindowedValue.valueInGlobalWindow("ABC"));
            registerOutputDataLocation.accept(WindowedValue.valueInGlobalWindow("DEF"));
            registerOutputDataLocation.accept(WindowedValue.valueInGlobalWindow("GHI"));
            createOutboundAggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
            countDownLatch.await();
            MatcherAssert.assertThat(concurrentLinkedQueue, (Matcher<? super ConcurrentLinkedQueue>) Matchers.contains(ELEMENTS_A_1, ELEMENTS_A_2));
            build3.shutdownNow();
        } catch (Throwable th) {
            build3.shutdownNow();
            throw th;
        }
    }

    static {
        try {
            ELEMENTS_A_1 = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(ENDPOINT_A.getInstructionId()).setTransformId(ENDPOINT_A.getTransformId()).setData(ByteString.copyFrom(CoderUtils.encodeToByteArray(CODER, WindowedValue.valueInGlobalWindow("ABC"))).concat(ByteString.copyFrom(CoderUtils.encodeToByteArray(CODER, WindowedValue.valueInGlobalWindow("DEF")))))).build();
            ELEMENTS_A_2 = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(ENDPOINT_A.getInstructionId()).setTransformId(ENDPOINT_A.getTransformId()).setData(ByteString.copyFrom(CoderUtils.encodeToByteArray(CODER, WindowedValue.valueInGlobalWindow("GHI"))))).addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(ENDPOINT_A.getInstructionId()).setTransformId(ENDPOINT_A.getTransformId()).setIsLast(true)).build();
            ELEMENTS_B_1 = BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(ENDPOINT_B.getInstructionId()).setTransformId(ENDPOINT_B.getTransformId()).setData(ByteString.copyFrom(CoderUtils.encodeToByteArray(CODER, WindowedValue.valueInGlobalWindow("JKL"))).concat(ByteString.copyFrom(CoderUtils.encodeToByteArray(CODER, WindowedValue.valueInGlobalWindow("MNO")))))).addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(ENDPOINT_B.getInstructionId()).setTransformId(ENDPOINT_B.getTransformId()).setIsLast(true)).build();
        } catch (Exception e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}
