/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.control;

import io.grpc.ManagedChannel;
import io.grpc.inprocess.InProcessChannelBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.logging.LogWriter;
import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.runners.java.fn.execution.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.java.fn.execution.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(value=JUnit4.class)
public class SdkHarnessClientTest {
    @Mock
    public FnApiControlClient fnApiControlClient;
    @Mock
    public FnDataService dataService;
    private SdkHarnessClient sdkHarnessClient;

    @Before
    public void setup() {
        MockitoAnnotations.initMocks((Object)this);
        this.sdkHarnessClient = SdkHarnessClient.usingFnApiClient((InstructionRequestHandler)this.fnApiControlClient, (FnDataService)this.dataService);
    }

    @Test
    public void testRegisterDoesNotCrash() throws Exception {
        String descriptorId1 = "descriptor1";
        String descriptorId2 = "descriptor2";
        CompletableFuture registerResponseFuture = new CompletableFuture();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(registerResponseFuture);
        BeamFnApi.ProcessBundleDescriptor descriptor1 = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
        BeamFnApi.ProcessBundleDescriptor descriptor2 = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId2).build();
        SdkHarnessClient.RemoteInputDestination remoteInputs = SdkHarnessClient.RemoteInputDestination.of((Coder)WindowedValue.FullWindowedValueCoder.of((Coder)VarIntCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE), (BeamFnApi.Target)BeamFnApi.Target.getDefaultInstance());
        Map responseFuture = this.sdkHarnessClient.register((Map)ImmutableMap.builder().put((Object)descriptor1, (Object)remoteInputs).put((Object)descriptor2, (Object)remoteInputs).build());
        Assert.assertThat(responseFuture.keySet(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new String[]{descriptor1.getId(), descriptor2.getId()}));
    }

    @Test
    public void testNewBundleNoDataDoesNotCrash() throws Exception {
        String descriptorId1 = "descriptor1";
        BeamFnApi.ProcessBundleDescriptor descriptor = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
        CompletableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture = new CompletableFuture<BeamFnApi.InstructionResponse>();
        Mockito.when((Object)this.fnApiControlClient.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(new CompletableFuture()).thenReturn(processBundleResponseFuture);
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE);
        SdkHarnessClient.BundleProcessor processor = this.sdkHarnessClient.getProcessor(descriptor, SdkHarnessClient.RemoteInputDestination.of((Coder)coder, (BeamFnApi.Target)BeamFnApi.Target.getDefaultInstance()));
        Mockito.when((Object)this.dataService.send((LogicalEndpoint)Matchers.any(), (Coder)Matchers.eq((Object)coder))).thenReturn(Mockito.mock(CloseableFnDataReceiver.class));
        SdkHarnessClient.ActiveBundle activeBundle = processor.newBundle(Collections.emptyMap());
        BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance();
        processBundleResponseFuture.complete(BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
        MoreFutures.get((CompletionStage)activeBundle.getBundleResponse());
    }

    @Test
    public void testNewBundleAndProcessElements() throws Exception {
        InProcessServerFactory serverFactory = InProcessServerFactory.create();
        ExecutorService executor = Executors.newCachedThreadPool();
        GrpcFnServer loggingServer = GrpcFnServer.allocatePortAndCreateFor((FnService)GrpcLoggingService.forWriter((LogWriter)Slf4jLogWriter.getDefault()), (ServerFactory)serverFactory);
        GrpcDataService grpcDataService = GrpcDataService.create((ExecutorService)executor);
        GrpcFnServer dataServer = GrpcFnServer.allocatePortAndCreateFor((FnService)grpcDataService, (ServerFactory)serverFactory);
        SynchronousQueue clientPool = new SynchronousQueue();
        FnApiControlClientPoolService clientPoolService = FnApiControlClientPoolService.offeringClientsToPool(clientPool);
        GrpcFnServer controlServer = GrpcFnServer.allocatePortAndCreateFor((FnService)clientPoolService, (ServerFactory)serverFactory);
        Future<Void> harness = executor.submit(() -> {
            FnHarness.main((PipelineOptions)PipelineOptionsFactory.create(), (Endpoints.ApiServiceDescriptor)loggingServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)controlServer.getApiServiceDescriptor(), (ManagedChannelFactory)new ManagedChannelFactory(){

                public ManagedChannel forDescriptor(Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
                    return InProcessChannelBuilder.forName((String)apiServiceDescriptor.getUrl()).build();
                }
            }, (StreamObserverFactory)StreamObserverFactory.direct());
            return null;
        });
        BeamFnApi.ProcessBundleDescriptor processBundleDescriptor = this.getProcessBundleDescriptor(dataServer.getApiServiceDescriptor());
        BeamFnApi.Target sdkGrpcReadTarget = BeamFnApi.Target.newBuilder().setName((String)Iterables.getOnlyElement(processBundleDescriptor.getTransformsOrThrow("read").getOutputsMap().keySet())).setPrimitiveTransformReference("read").build();
        BeamFnApi.Target sdkGrpcWriteTarget = BeamFnApi.Target.newBuilder().setName((String)Iterables.getOnlyElement(processBundleDescriptor.getTransformsOrThrow("write").getInputsMap().keySet())).setPrimitiveTransformReference("write").build();
        SdkHarnessClient client = SdkHarnessClient.usingFnApiClient((InstructionRequestHandler)((InstructionRequestHandler)clientPool.take()), (FnDataService)grpcDataService);
        SdkHarnessClient.BundleProcessor processor = client.getProcessor(processBundleDescriptor, SdkHarnessClient.RemoteInputDestination.of((Coder)WindowedValue.FullWindowedValueCoder.of((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE), (BeamFnApi.Target)sdkGrpcReadTarget));
        ArrayList outputs = new ArrayList();
        SdkHarnessClient.ActiveBundle activeBundle = processor.newBundle(Collections.singletonMap(sdkGrpcWriteTarget, SdkHarnessClient.RemoteOutputReceiver.of((Coder)WindowedValue.FullWindowedValueCoder.of((Coder)LengthPrefixCoder.of((Coder)StringUtf8Coder.of()), (Coder)GlobalWindow.Coder.INSTANCE), outputs::add)));
        try (CloseableFnDataReceiver bundleInputReceiver = activeBundle.getInputReceiver();){
            bundleInputReceiver.accept((Object)WindowedValue.valueInGlobalWindow((Object)"foo"));
            bundleInputReceiver.accept((Object)WindowedValue.valueInGlobalWindow((Object)"bar"));
            bundleInputReceiver.accept((Object)WindowedValue.valueInGlobalWindow((Object)"baz"));
        }
        MoreFutures.get((CompletionStage)activeBundle.getBundleResponse());
        for (InboundDataClient outputClient : activeBundle.getOutputClients().values()) {
            outputClient.awaitCompletion();
        }
        Assert.assertThat(outputs, (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"spam"), WindowedValue.valueInGlobalWindow((Object)"ham"), WindowedValue.valueInGlobalWindow((Object)"eggs")}));
        executor.shutdownNow();
    }

    private BeamFnApi.ProcessBundleDescriptor getProcessBundleDescriptor(Endpoints.ApiServiceDescriptor endpoint) throws IOException {
        Pipeline userPipeline = Pipeline.create();
        TupleTag outputTag = new TupleTag();
        ((PCollection)userPipeline.apply("create", (PTransform)Create.of((Object)"foo", (Object[])new String[0]))).apply("proc", (PTransform)ParDo.of((DoFn)new TestFn()).withOutputTags(outputTag, TupleTagList.empty()));
        RunnerApi.Pipeline userProto = PipelineTranslation.toProto((Pipeline)userPipeline);
        BeamFnApi.ProcessBundleDescriptor.Builder pbdBuilder = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("my_id").putAllEnvironments(userProto.getComponents().getEnvironmentsMap()).putAllWindowingStrategies(userProto.getComponents().getWindowingStrategiesMap()).putAllCoders(userProto.getComponents().getCodersMap());
        RunnerApi.Coder fullValueCoder = CoderTranslation.toProto((Coder)WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE)).getCoder();
        pbdBuilder.putCoders("wire_coder", fullValueCoder);
        RunnerApi.PTransform targetProcessor = userProto.getComponents().getTransformsOrThrow("proc");
        BeamFnApi.RemoteGrpcPort port = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(endpoint).setCoderId("wire_coder").build();
        RemoteGrpcPortRead readNode = RemoteGrpcPortRead.readFromPort((BeamFnApi.RemoteGrpcPort)port, (String)((String)Iterables.getOnlyElement(targetProcessor.getInputsMap().values())));
        RemoteGrpcPortWrite writeNode = RemoteGrpcPortWrite.writeToPort((String)((String)Iterables.getOnlyElement(targetProcessor.getOutputsMap().values())), (BeamFnApi.RemoteGrpcPort)port);
        for (String pc : targetProcessor.getInputsMap().values()) {
            pbdBuilder.putPcollections(pc, userProto.getComponents().getPcollectionsOrThrow(pc));
        }
        for (String pc : targetProcessor.getOutputsMap().values()) {
            pbdBuilder.putPcollections(pc, userProto.getComponents().getPcollectionsOrThrow(pc));
        }
        pbdBuilder.putTransforms("proc", targetProcessor).putTransforms("read", readNode.toPTransform()).putTransforms("write", writeNode.toPTransform());
        return pbdBuilder.build();
    }

    private static class TestFn
    extends DoFn<String, String> {
        private TestFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            if (((String)context.element()).equals("foo")) {
                context.output((Object)"spam");
            } else if (((String)context.element()).equals("bar")) {
                context.output((Object)"ham");
            } else {
                context.output((Object)"eggs");
            }
        }
    }
}

