package org.apache.beam.runners.direct.portable;

import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PipelineTranslation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.SingleEnvironmentInstanceJobBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.EmbeddedEnvironmentFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionList;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.class */
public class RemoteStageEvaluatorFactoryTest implements Serializable {
    private transient RemoteStageEvaluatorFactory factory;
    private transient ExecutorService executor;
    private transient GrpcFnServer<GrpcDataService> dataServer;
    private transient GrpcFnServer<GrpcStateService> stateServer;
    private transient GrpcFnServer<FnApiControlClientPoolService> controlServer;
    private transient GrpcFnServer<GrpcLoggingService> loggingServer;
    private transient BundleFactory bundleFactory;

    @Before
    public void setup() throws Exception {
        InProcessServerFactory create = InProcessServerFactory.create();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        this.controlServer = GrpcFnServer.allocatePortAndCreateFor(FnApiControlClientPoolService.offeringClientsToPool((str, instructionRequestHandler) -> {
            linkedBlockingQueue.put(instructionRequestHandler);
        }, GrpcContextHeaderAccessorProvider.getHeaderAccessor()), create);
        this.loggingServer = GrpcFnServer.allocatePortAndCreateFor(GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), create);
        EnvironmentFactory create2 = EmbeddedEnvironmentFactory.create(PipelineOptionsFactory.create(), this.loggingServer, this.controlServer, (str2, duration) -> {
            return (InstructionRequestHandler) linkedBlockingQueue.take();
        });
        this.executor = Executors.newCachedThreadPool();
        this.dataServer = GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(this.executor, OutboundObserverFactory.serverDirect()), create);
        this.stateServer = GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), create);
        this.bundleFactory = ImmutableListBundleFactory.create();
        this.factory = new RemoteStageEvaluatorFactory(this.bundleFactory, SingleEnvironmentInstanceJobBundleFactory.create(create2, this.dataServer, this.stateServer));
    }

    /* JADX WARN: Failed to calculate best type for var: r6v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r6v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r7v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r7v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 6, insn: 0x00aa: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r6 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:52:0x00aa */
    /* JADX WARN: Not initialized variable reg: 7, insn: 0x00ae: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r7 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:54:0x00ae */
    /* JADX WARN: Type inference failed for: r6v1, types: [java.lang.AutoCloseable] */
    /* JADX WARN: Type inference failed for: r7v0, types: [java.lang.Throwable] */
    @After
    public void teardown() throws Exception {
        ?? r6;
        ?? r7;
        GrpcFnServer<GrpcLoggingService> grpcFnServer = this.loggingServer;
        try {
            try {
                ExecutorService executorService = this.executor;
                Objects.requireNonNull(executorService);
                AutoCloseable autoCloseable = executorService::shutdownNow;
                GrpcFnServer<GrpcDataService> grpcFnServer2 = this.dataServer;
                try {
                    GrpcFnServer<GrpcStateService> grpcFnServer3 = this.stateServer;
                    Throwable th = null;
                    try {
                        try {
                            GrpcFnServer<FnApiControlClientPoolService> grpcFnServer4 = this.controlServer;
                            if (grpcFnServer4 != null) {
                                $closeResource(null, grpcFnServer4);
                            }
                            if (grpcFnServer3 != null) {
                                $closeResource(null, grpcFnServer3);
                            }
                            if (grpcFnServer2 != null) {
                                $closeResource(null, grpcFnServer2);
                            }
                            if (autoCloseable != null) {
                                $closeResource(null, autoCloseable);
                            }
                        } catch (Throwable th2) {
                            th = th2;
                            throw th2;
                        }
                    } catch (Throwable th3) {
                        if (grpcFnServer3 != null) {
                            $closeResource(th, grpcFnServer3);
                        }
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (grpcFnServer2 != null) {
                        $closeResource(null, grpcFnServer2);
                    }
                    throw th4;
                }
            } catch (Throwable th5) {
                if (r6 != 0) {
                    $closeResource(r7, r6);
                }
                throw th5;
            }
        } finally {
            if (grpcFnServer != null) {
                $closeResource(null, grpcFnServer);
            }
        }
    }

    @Test
    public void executesRemoteStage() throws Exception {
        Pipeline create = Pipeline.create();
        create.apply("impulse", Impulse.create()).apply("CreateInputs", ParDo.of(new DoFn<byte[], Integer>() { // from class: org.apache.beam.runners.direct.portable.RemoteStageEvaluatorFactoryTest.1
            @DoFn.ProcessElement
            public void create(DoFn<byte[], Integer>.ProcessContext processContext) {
                processContext.output(1);
                processContext.output(2);
                processContext.output(3);
            }
        })).apply("ParDo", ParDo.of(new DoFn<Integer, KV<String, Long>>() { // from class: org.apache.beam.runners.direct.portable.RemoteStageEvaluatorFactoryTest.2
            @DoFn.ProcessElement
            public void proc(DoFn<Integer, KV<String, Long>>.ProcessContext processContext) {
                processContext.output(KV.of("foo", Long.valueOf(((Integer) processContext.element()).longValue())));
            }
        })).apply(GroupByKey.create());
        RunnerApi.Pipeline pipeline = GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).toPipeline();
        QueryablePipeline forPipeline = QueryablePipeline.forPipeline(pipeline);
        PipelineNode.PCollectionNode pCollectionNode = (PipelineNode.PCollectionNode) Iterables.getOnlyElement(forPipeline.getOutputPCollections((PipelineNode.PTransformNode) Iterables.getOnlyElement(forPipeline.getRootTransforms())));
        PipelineNode.PTransformNode pTransformNode = (PipelineNode.PTransformNode) pipeline.getRootTransformIdsList().stream().map(str -> {
            return PipelineNode.pTransform(str, pipeline.getComponents().getTransformsOrThrow(str));
        }).filter(pTransformNode2 -> {
            return pTransformNode2.getTransform().getSpec().getUrn().equals(ExecutableStage.URN);
        }).findFirst().orElseThrow(IllegalArgumentException::new);
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(new byte[0]);
        TransformEvaluator forApplication = this.factory.forApplication(pTransformNode, this.bundleFactory.createBundle(pCollectionNode).add(valueInGlobalWindow).commit(Instant.now()));
        forApplication.processElement(valueInGlobalWindow);
        TransformResult finishBundle = forApplication.finishBundle();
        Assert.assertThat(Integer.valueOf(Iterables.size(finishBundle.getOutputBundles())), Matchers.equalTo(1));
        Assert.assertThat(Integer.valueOf(Iterables.size(((UncommittedBundle) Iterables.getOnlyElement(finishBundle.getOutputBundles())).commit(Instant.now()))), Matchers.equalTo(3));
    }

    @Test
    public void executesStageWithFlatten() throws Exception {
        ParDo.SingleOutput of = ParDo.of(new DoFn<byte[], KV<Integer, String>>() { // from class: org.apache.beam.runners.direct.portable.RemoteStageEvaluatorFactoryTest.3
            @DoFn.ProcessElement
            public void process(DoFn<byte[], KV<Integer, String>>.ProcessContext processContext) {
                processContext.output(KV.of(1, "foo"));
                processContext.output(KV.of(1, "bar"));
                processContext.output(KV.of(2, "foo"));
            }
        });
        Pipeline create = Pipeline.create();
        PCollectionList.of(create.apply("left", Impulse.create()).apply(of)).and(create.apply("right", Impulse.create()).apply(of)).apply(Flatten.pCollections()).apply(GroupByKey.create());
        RunnerApi.Pipeline pipeline = GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).toPipeline();
        PipelineNode.PTransformNode pTransformNode = null;
        PipelineNode.PTransformNode pTransformNode2 = null;
        for (PipelineNode.PTransformNode pTransformNode3 : QueryablePipeline.forPipeline(pipeline).getRootTransforms()) {
            if (pTransformNode3.getId().equals("left")) {
                pTransformNode = pTransformNode3;
            } else {
                pTransformNode2 = pTransformNode3;
            }
        }
        Preconditions.checkState(pTransformNode != null);
        Preconditions.checkState(pTransformNode2 != null);
        PipelineNode.PTransformNode pTransformNode4 = (PipelineNode.PTransformNode) pipeline.getRootTransformIdsList().stream().map(str -> {
            return PipelineNode.pTransform(str, pipeline.getComponents().getTransformsOrThrow(str));
        }).filter(pTransformNode5 -> {
            return pTransformNode5.getTransform().getSpec().getUrn().equals(ExecutableStage.URN);
        }).findFirst().orElseThrow(IllegalArgumentException::new);
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(new byte[0]);
        String str2 = (String) Iterables.getOnlyElement(pTransformNode4.getTransform().getInputsMap().values());
        TransformEvaluator forApplication = this.factory.forApplication(pTransformNode4, this.bundleFactory.createBundle(PipelineNode.pCollection(str2, pipeline.getComponents().getPcollectionsOrThrow(str2))).add(valueInGlobalWindow).commit(Instant.now()));
        forApplication.processElement(valueInGlobalWindow);
        TransformResult finishBundle = forApplication.finishBundle();
        Assert.assertThat(Integer.valueOf(Iterables.size(finishBundle.getOutputBundles())), Matchers.equalTo(1));
        Assert.assertThat(Integer.valueOf(Iterables.size(((UncommittedBundle) Iterables.getOnlyElement(finishBundle.getOutputBundles())).commit(Instant.now()))), Matchers.equalTo(3));
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
