package org.apache.beam.runners.fnexecution.data;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.DataEndpoint;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.InProcessServerFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.class */
public class GrpcDataServiceTest {

    @Rule
    public transient Timeout globalTimeout = Timeout.seconds(600);
    private static final String TRANSFORM_ID = "888";
    private static final Coder<WindowedValue<String>> CODER = LengthPrefixCoder.of(WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()));

    @Test
    public void testMessageReceivedBySingleClientWhenThereAreMultipleClients() throws Exception {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        GrpcDataService create = GrpcDataService.create(PipelineOptionsFactory.create(), Executors.newCachedThreadPool(), OutboundObserverFactory.serverDirect());
        GrpcFnServer allocatePortAndCreateFor = GrpcFnServer.allocatePortAndCreateFor(create, InProcessServerFactory.create());
        try {
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 3; i++) {
                arrayList.add(newCachedThreadPool.submit(() -> {
                    BeamFnDataGrpc.BeamFnDataStub newStub = BeamFnDataGrpc.newStub(InProcessChannelBuilder.forName(allocatePortAndCreateFor.getApiServiceDescriptor().getUrl()).directExecutor().build());
                    Objects.requireNonNull(linkedBlockingQueue);
                    StreamObserver data = newStub.data(TestStreams.withOnNext((v1) -> {
                        r1.add(v1);
                    }).build());
                    countDownLatch.await();
                    data.onCompleted();
                    return null;
                }));
            }
            for (int i2 = 0; i2 < 3; i2++) {
                String num = Integer.toString(i2);
                BeamFnDataOutboundAggregator createOutboundAggregator = create.createOutboundAggregator(() -> {
                    return num;
                }, false);
                createOutboundAggregator.start();
                FnDataReceiver registerOutputDataLocation = createOutboundAggregator.registerOutputDataLocation(TRANSFORM_ID, CODER);
                registerOutputDataLocation.accept(WindowedValue.valueInGlobalWindow("A" + i2));
                registerOutputDataLocation.accept(WindowedValue.valueInGlobalWindow("B" + i2));
                registerOutputDataLocation.accept(WindowedValue.valueInGlobalWindow("C" + i2));
                createOutboundAggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
            }
            countDownLatch.countDown();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
            MatcherAssert.assertThat(linkedBlockingQueue, Matchers.containsInAnyOrder(new BeamFnApi.Elements[]{elementsWithData("0"), elementsWithData("1"), elementsWithData("2")}));
            if (allocatePortAndCreateFor != null) {
                $closeResource(null, allocatePortAndCreateFor);
            }
        } catch (Throwable th) {
            if (allocatePortAndCreateFor != null) {
                $closeResource(null, allocatePortAndCreateFor);
            }
            throw th;
        }
    }

    @Test
    public void testMultipleClientsSendMessagesAreDirectedToProperConsumers() throws Exception {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        GrpcDataService create = GrpcDataService.create(PipelineOptionsFactory.create(), Executors.newCachedThreadPool(), OutboundObserverFactory.serverDirect());
        GrpcFnServer allocatePortAndCreateFor = GrpcFnServer.allocatePortAndCreateFor(create, InProcessServerFactory.create());
        try {
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 3; i++) {
                String num = Integer.toString(i);
                arrayList.add(newCachedThreadPool.submit(() -> {
                    BeamFnDataGrpc.BeamFnDataStub newStub = BeamFnDataGrpc.newStub(InProcessChannelBuilder.forName(allocatePortAndCreateFor.getApiServiceDescriptor().getUrl()).build());
                    Objects.requireNonNull(linkedBlockingQueue);
                    StreamObserver data = newStub.data(TestStreams.withOnNext((v1) -> {
                        r1.add(v1);
                    }).build());
                    data.onNext(elementsWithData(num));
                    countDownLatch.await();
                    data.onCompleted();
                    return null;
                }));
            }
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            for (int i2 = 0; i2 < 3; i2++) {
                ArrayList arrayList4 = new ArrayList();
                arrayList2.add(arrayList4);
                Coder<WindowedValue<String>> coder = CODER;
                Objects.requireNonNull(arrayList4);
                BeamFnDataInboundObserver forConsumers = BeamFnDataInboundObserver.forConsumers(Arrays.asList(DataEndpoint.create(TRANSFORM_ID, coder, (v1) -> {
                    r5.add(v1);
                })), Collections.emptyList());
                create.registerReceiver(Integer.toString(i2), forConsumers);
                arrayList3.add(forConsumers);
            }
            Iterator it = arrayList3.iterator();
            while (it.hasNext()) {
                ((BeamFnDataInboundObserver) it.next()).awaitCompletion();
            }
            countDownLatch.countDown();
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((Future) it2.next()).get();
            }
            for (int i3 = 0; i3 < 3; i3++) {
                MatcherAssert.assertThat((Collection) arrayList2.get(i3), Matchers.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("A" + i3), WindowedValue.valueInGlobalWindow("B" + i3), WindowedValue.valueInGlobalWindow("C" + i3)}));
            }
            MatcherAssert.assertThat(linkedBlockingQueue, Matchers.empty());
            if (allocatePortAndCreateFor != null) {
                $closeResource(null, allocatePortAndCreateFor);
            }
        } catch (Throwable th) {
            if (allocatePortAndCreateFor != null) {
                $closeResource(null, allocatePortAndCreateFor);
            }
            throw th;
        }
    }

    private BeamFnApi.Elements elementsWithData(String str) throws CoderException {
        return BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(str).setTransformId(TRANSFORM_ID).setData(ByteString.copyFrom(CoderUtils.encodeToByteArray(CODER, WindowedValue.valueInGlobalWindow("A" + str))).concat(ByteString.copyFrom(CoderUtils.encodeToByteArray(CODER, WindowedValue.valueInGlobalWindow("B" + str)))).concat(ByteString.copyFrom(CoderUtils.encodeToByteArray(CODER, WindowedValue.valueInGlobalWindow("C" + str)))))).addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(str).setTransformId(TRANSFORM_ID).setIsLast(true)).build();
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
