/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.data;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class GrpcDataServiceTest {
    private static final String PTRANSFORM_ID = "888";
    private static final Coder<WindowedValue<String>> CODER = LengthPrefixCoder.of((Coder)WindowedValue.getValueOnlyCoder((Coder)StringUtf8Coder.of()));

    @Test
    public void testMessageReceivedBySingleClientWhenThereAreMultipleClients() throws Exception {
        LinkedBlockingQueue clientInboundElements = new LinkedBlockingQueue();
        ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch waitForInboundElements = new CountDownLatch(1);
        GrpcDataService service = GrpcDataService.create((ExecutorService)Executors.newCachedThreadPool(), (OutboundObserverFactory)OutboundObserverFactory.serverDirect());
        try (GrpcFnServer server = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)InProcessServerFactory.create());){
            int i;
            ArrayList<Future<Void>> clientFutures = new ArrayList<Future<Void>>();
            for (i = 0; i < 3; ++i) {
                clientFutures.add(executorService.submit(() -> {
                    ManagedChannel channel = ((InProcessChannelBuilder)InProcessChannelBuilder.forName((String)server.getApiServiceDescriptor().getUrl()).directExecutor()).build();
                    StreamObserver outboundObserver = BeamFnDataGrpc.newStub((Channel)channel).data((StreamObserver)TestStreams.withOnNext(clientInboundElements::add).build());
                    waitForInboundElements.await();
                    outboundObserver.onCompleted();
                    return null;
                }));
            }
            for (i = 0; i < 3; ++i) {
                CloseableFnDataReceiver closeableFnDataReceiver = service.send(LogicalEndpoint.of((String)Integer.toString(i), (String)PTRANSFORM_ID), CODER);
                closeableFnDataReceiver.accept((Object)WindowedValue.valueInGlobalWindow((Object)("A" + i)));
                closeableFnDataReceiver.accept((Object)WindowedValue.valueInGlobalWindow((Object)("B" + i)));
                closeableFnDataReceiver.accept((Object)WindowedValue.valueInGlobalWindow((Object)("C" + i)));
                closeableFnDataReceiver.close();
            }
            waitForInboundElements.countDown();
            for (Future future : clientFutures) {
                future.get();
            }
            Assert.assertThat(clientInboundElements, (Matcher)Matchers.containsInAnyOrder((Object[])new BeamFnApi.Elements[]{this.elementsWithData("0"), this.elementsWithData("1"), this.elementsWithData("2")}));
        }
    }

    @Test
    public void testMultipleClientsSendMessagesAreDirectedToProperConsumers() throws Exception {
        LinkedBlockingQueue clientInboundElements = new LinkedBlockingQueue();
        ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch waitForInboundElements = new CountDownLatch(1);
        GrpcDataService service = GrpcDataService.create((ExecutorService)Executors.newCachedThreadPool(), (OutboundObserverFactory)OutboundObserverFactory.serverDirect());
        try (GrpcFnServer server = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)InProcessServerFactory.create());){
            ArrayList<Future<Void>> clientFutures = new ArrayList<Future<Void>>();
            for (int i = 0; i < 3; ++i) {
                String instructionReference = Integer.toString(i);
                clientFutures.add(executorService.submit(() -> {
                    ManagedChannel channel = InProcessChannelBuilder.forName((String)server.getApiServiceDescriptor().getUrl()).build();
                    StreamObserver outboundObserver = BeamFnDataGrpc.newStub((Channel)channel).data((StreamObserver)TestStreams.withOnNext(clientInboundElements::add).build());
                    outboundObserver.onNext((Object)this.elementsWithData(instructionReference));
                    waitForInboundElements.await();
                    outboundObserver.onCompleted();
                    return null;
                }));
            }
            ArrayList serverInboundValues = new ArrayList();
            ArrayList<InboundDataClient> readFutures = new ArrayList<InboundDataClient>();
            for (int i = 0; i < 3; ++i) {
                ArrayList arrayList = new ArrayList();
                serverInboundValues.add(arrayList);
                readFutures.add(service.receive(LogicalEndpoint.of((String)Integer.toString(i), (String)PTRANSFORM_ID), CODER, arrayList::add));
            }
            for (InboundDataClient inboundDataClient : readFutures) {
                inboundDataClient.awaitCompletion();
            }
            waitForInboundElements.countDown();
            for (Future future : clientFutures) {
                future.get();
            }
            for (int i = 0; i < 3; ++i) {
                Assert.assertThat((Object)((Collection)serverInboundValues.get(i)), (Matcher)Matchers.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)("A" + i)), WindowedValue.valueInGlobalWindow((Object)("B" + i)), WindowedValue.valueInGlobalWindow((Object)("C" + i))}));
            }
            Assert.assertThat(clientInboundElements, (Matcher)Matchers.empty());
        }
    }

    private BeamFnApi.Elements elementsWithData(String id) throws CoderException {
        return BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference(id).setPtransformId(PTRANSFORM_ID).setData(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray(CODER, (Object)WindowedValue.valueInGlobalWindow((Object)("A" + id)))).concat(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray(CODER, (Object)WindowedValue.valueInGlobalWindow((Object)("B" + id))))).concat(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray(CODER, (Object)WindowedValue.valueInGlobalWindow((Object)("C" + id))))))).addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference(id).setPtransformId(PTRANSFORM_ID)).build();
    }
}

