package org.apache.beam.runners.fnexecution.control;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.FusedPipeline;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Collections2;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsEmptyIterable;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.joda.time.DateTimeUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.class */
public class RemoteExecutionTest implements Serializable {

    @Rule
    public transient ResetDateTimeProvider resetDateTimeProvider = new ResetDateTimeProvider();
    private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutionTest.class);
    private transient GrpcFnServer<FnApiControlClientPoolService> controlServer;
    private transient GrpcFnServer<GrpcDataService> dataServer;
    private transient GrpcFnServer<GrpcStateService> stateServer;
    private transient GrpcFnServer<GrpcLoggingService> loggingServer;
    private transient GrpcStateService stateDelegator;
    private transient SdkHarnessClient controlClient;
    private transient ExecutorService serverExecutor;
    private transient ExecutorService sdkHarnessExecutor;
    private transient Future<?> sdkHarnessExecutorFuture;

    @Before
    public void setup() throws Exception {
        ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true).build();
        this.serverExecutor = Executors.newCachedThreadPool(build);
        InProcessServerFactory create = InProcessServerFactory.create();
        this.dataServer = GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(this.serverExecutor, OutboundObserverFactory.serverDirect()), create);
        this.loggingServer = GrpcFnServer.allocatePortAndCreateFor(GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), create);
        this.stateDelegator = GrpcStateService.create();
        this.stateServer = GrpcFnServer.allocatePortAndCreateFor(this.stateDelegator, create);
        MapControlClientPool create2 = MapControlClientPool.create();
        this.controlServer = GrpcFnServer.allocatePortAndCreateFor(FnApiControlClientPoolService.offeringClientsToPool(create2.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()), create);
        this.sdkHarnessExecutor = Executors.newSingleThreadExecutor(build);
        this.sdkHarnessExecutorFuture = this.sdkHarnessExecutor.submit(() -> {
            try {
                FnHarness.main("id", PipelineOptionsFactory.create(), this.loggingServer.getApiServiceDescriptor(), this.controlServer.getApiServiceDescriptor(), InProcessManagedChannelFactory.create(), OutboundObserverFactory.clientDirect());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        this.controlClient = SdkHarnessClient.usingFnApiClient(create2.getSource().take("", Duration.ofSeconds(2L)), this.dataServer.getService());
    }

    @After
    public void tearDown() throws Exception {
        this.controlServer.close();
        this.stateServer.close();
        this.dataServer.close();
        this.loggingServer.close();
        this.controlClient.close();
        this.sdkHarnessExecutor.shutdownNow();
        this.serverExecutor.shutdownNow();
        try {
            this.sdkHarnessExecutorFuture.get();
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof RuntimeException) || !(e.getCause().getCause() instanceof InterruptedException)) {
                throw e;
            }
        }
    }

    @Test
    public void testExecution() throws Exception {
        Pipeline create = Pipeline.create();
        create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], String>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.1
            @DoFn.ProcessElement
            public void process(DoFn<byte[], String>.ProcessContext processContext) {
                processContext.output("zero");
                processContext.output("one");
                processContext.output("two");
            }
        })).apply("len", ParDo.of(new DoFn<String, Long>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.2
            @DoFn.ProcessElement
            public void process(DoFn<String, Long>.ProcessContext processContext) {
                processContext.output(Long.valueOf(((String) processContext.element()).length()));
            }
        })).apply("addKeys", WithKeys.of("foo")).setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of())).apply("gbk", GroupByKey.create());
        FusedPipeline fuse = GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create));
        Preconditions.checkState(fuse.getFusedStages().size() == 1, "Expected exactly one fused stage");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("my_stage", (ExecutableStage) fuse.getFusedStages().iterator().next(), this.dataServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations());
        Map outputTargetCoders = fromExecutableStage.getOutputTargetCoders();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : outputTargetCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            hashMap.put((BeamFnApi.Target) entry.getKey(), synchronizedList);
            BeamFnApi.Target target = (BeamFnApi.Target) entry.getKey();
            Coder coder = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap2.put(target, RemoteOutputReceiver.of(coder, (v1) -> {
                r3.add(v1);
            }));
        }
        SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(hashMap2, BundleProgressHandler.ignored());
        Throwable th = null;
        try {
            try {
                ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(new byte[0]));
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                Iterator it = hashMap.values().iterator();
                while (it.hasNext()) {
                    Assert.assertThat((Collection) it.next(), Matchers.containsInAnyOrder(new Object[]{WindowedValue.valueInGlobalWindow(kvBytes("foo", 4L)), WindowedValue.valueInGlobalWindow(kvBytes("foo", 3L)), WindowedValue.valueInGlobalWindow(kvBytes("foo", 3L))}));
                }
            } finally {
            }
        } catch (Throwable th2) {
            if (newBundle != null) {
                $closeResource(th, newBundle);
            }
            throw th2;
        }
    }

    @Test
    public void testBundleProcessorThrowsExecutionExceptionWhenUserCodeThrows() throws Exception {
        Pipeline create = Pipeline.create();
        create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.3
            @DoFn.ProcessElement
            public void process(DoFn<byte[], KV<String, String>>.ProcessContext processContext) throws Exception {
                String str = (String) CoderUtils.decodeFromByteArray(StringUtf8Coder.of(), (byte[]) processContext.element());
                if (str.equals("X")) {
                    throw new Exception("testBundleExecutionFailure");
                }
                processContext.output(KV.of(str, str));
            }
        })).apply("gbk", GroupByKey.create());
        FusedPipeline fuse = GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create));
        Preconditions.checkState(fuse.getFusedStages().size() == 1, "Expected exactly one fused stage");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("my_stage", (ExecutableStage) fuse.getFusedStages().iterator().next(), this.dataServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations());
        Map outputTargetCoders = fromExecutableStage.getOutputTargetCoders();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : outputTargetCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            hashMap.put((BeamFnApi.Target) entry.getKey(), synchronizedList);
            BeamFnApi.Target target = (BeamFnApi.Target) entry.getKey();
            Coder coder = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap2.put(target, RemoteOutputReceiver.of(coder, (v1) -> {
                r3.add(v1);
            }));
        }
        SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(hashMap2, BundleProgressHandler.ignored());
        Throwable th = null;
        try {
            try {
                ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y")));
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                try {
                    SdkHarnessClient.ActiveBundle newBundle2 = processor.newBundle(hashMap2, BundleProgressHandler.ignored());
                    Throwable th2 = null;
                    try {
                        try {
                            ((FnDataReceiver) Iterables.getOnlyElement(newBundle2.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
                            if (newBundle2 != null) {
                                $closeResource(null, newBundle2);
                            }
                            Assert.fail();
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (newBundle2 != null) {
                            $closeResource(th2, newBundle2);
                        }
                        throw th3;
                    }
                } catch (ExecutionException e) {
                    Assert.assertTrue(e.getMessage().contains("testBundleExecutionFailure"));
                }
                newBundle = processor.newBundle(hashMap2, BundleProgressHandler.ignored());
                Throwable th4 = null;
                try {
                    try {
                        ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Z")));
                        if (newBundle != null) {
                            $closeResource(null, newBundle);
                        }
                        Iterator it = hashMap.values().iterator();
                        while (it.hasNext()) {
                            Assert.assertThat((Collection) it.next(), Matchers.containsInAnyOrder(new Object[]{WindowedValue.valueInGlobalWindow(kvBytes("Y", "Y")), WindowedValue.valueInGlobalWindow(kvBytes("Z", "Z"))}));
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    /* JADX WARN: Type inference failed for: r0v42, types: [byte[], java.lang.Object[]] */
    @Test
    public void testExecutionWithSideInput() throws Exception {
        Pipeline create = Pipeline.create();
        PCollection coder = create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], String>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.4
            @DoFn.ProcessElement
            public void process(DoFn<byte[], String>.ProcessContext processContext) {
                processContext.output("zero");
                processContext.output("one");
                processContext.output("two");
            }
        })).setCoder(StringUtf8Coder.of());
        final PCollectionView apply = coder.apply("createSideInput", View.asIterable());
        coder.apply("readSideInput", ParDo.of(new DoFn<String, KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.5
            @DoFn.ProcessElement
            public void processElement(DoFn<String, KV<String, String>>.ProcessContext processContext) {
                Iterator it = ((Iterable) processContext.sideInput(apply)).iterator();
                while (it.hasNext()) {
                    processContext.output(KV.of((String) processContext.element(), (String) it.next()));
                }
            }
        }).withSideInputs(new PCollectionView[]{apply})).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply("gbk", GroupByKey.create());
        Optional tryFind = Iterables.tryFind(GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages(), executableStage -> {
            return !executableStage.getSideInputs().isEmpty();
        });
        Preconditions.checkState(tryFind.isPresent(), "Expected a stage with side inputs.");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("test_stage", (ExecutableStage) tryFind.get(), this.dataServer.getApiServiceDescriptor(), this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations(), this.stateDelegator);
        Map outputTargetCoders = fromExecutableStage.getOutputTargetCoders();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : outputTargetCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            hashMap.put((BeamFnApi.Target) entry.getKey(), synchronizedList);
            BeamFnApi.Target target = (BeamFnApi.Target) entry.getKey();
            Coder coder2 = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap2.put(target, RemoteOutputReceiver.of(coder2, (v1) -> {
                r3.add(v1);
            }));
        }
        final List asList = Arrays.asList(new byte[]{CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A"), CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B"), CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C")});
        SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(hashMap2, StateRequestHandlers.forSideInputHandlerFactory(fromExecutableStage.getSideInputSpecs(), new StateRequestHandlers.SideInputHandlerFactory() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.6
            public <T, V, W extends BoundedWindow> StateRequestHandlers.SideInputHandler<V, W> forSideInput(String str, String str2, RunnerApi.FunctionSpec functionSpec, final Coder<T> coder3, Coder<W> coder4) {
                return (StateRequestHandlers.SideInputHandler<V, W>) new StateRequestHandlers.SideInputHandler<V, W>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.6.1
                    /* JADX WARN: Incorrect types in method signature: ([BTW;)Ljava/lang/Iterable<TV;>; */
                    public Iterable get(byte[] bArr, BoundedWindow boundedWindow) {
                        return asList;
                    }

                    public Coder<V> resultCoder() {
                        return coder3.getValueCoder();
                    }
                };
            }
        }), BundleProgressHandler.ignored());
        Throwable th = null;
        try {
            try {
                ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
                ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y")));
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                Iterator it = hashMap.values().iterator();
                while (it.hasNext()) {
                    Assert.assertThat((Collection) it.next(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(kvBytes("X", "A")), WindowedValue.valueInGlobalWindow(kvBytes("X", "B")), WindowedValue.valueInGlobalWindow(kvBytes("X", "C")), WindowedValue.valueInGlobalWindow(kvBytes("Y", "A")), WindowedValue.valueInGlobalWindow(kvBytes("Y", "B")), WindowedValue.valueInGlobalWindow(kvBytes("Y", "C"))}));
                }
            } finally {
            }
        } catch (Throwable th2) {
            if (newBundle != null) {
                $closeResource(th, newBundle);
            }
            throw th2;
        }
    }

    /* JADX WARN: Type inference failed for: r0v35, types: [byte[], java.lang.Object[]] */
    @Test
    public void testMetrics() throws Exception {
        Pipeline create = Pipeline.create();
        create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], String>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.7
            @DoFn.ProcessElement
            public void process(DoFn<byte[], String>.ProcessContext processContext) {
                Metrics.counter(RemoteExecutionTest.class, "counterMetric").inc();
            }
        })).setCoder(StringUtf8Coder.of());
        Optional tryFind = Iterables.tryFind(GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages(), executableStage -> {
            return true;
        });
        Preconditions.checkState(tryFind.isPresent(), "Expected a stage with side inputs.");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("test_stage", (ExecutableStage) tryFind.get(), this.dataServer.getApiServiceDescriptor(), this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations(), this.stateDelegator);
        Map outputTargetCoders = fromExecutableStage.getOutputTargetCoders();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : outputTargetCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            hashMap.put((BeamFnApi.Target) entry.getKey(), synchronizedList);
            BeamFnApi.Target target = (BeamFnApi.Target) entry.getKey();
            Coder coder = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap2.put(target, RemoteOutputReceiver.of(coder, (v1) -> {
                r3.add(v1);
            }));
        }
        final List asList = Arrays.asList(new byte[]{CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A"), CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B"), CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C")});
        SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(hashMap2, StateRequestHandlers.forSideInputHandlerFactory(fromExecutableStage.getSideInputSpecs(), new StateRequestHandlers.SideInputHandlerFactory() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.8
            public <T, V, W extends BoundedWindow> StateRequestHandlers.SideInputHandler<V, W> forSideInput(String str, String str2, RunnerApi.FunctionSpec functionSpec, final Coder<T> coder2, Coder<W> coder3) {
                return (StateRequestHandlers.SideInputHandler<V, W>) new StateRequestHandlers.SideInputHandler<V, W>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.8.1
                    /* JADX WARN: Incorrect types in method signature: ([BTW;)Ljava/lang/Iterable<TV;>; */
                    public Iterable get(byte[] bArr, BoundedWindow boundedWindow) {
                        return asList;
                    }

                    public Coder<V> resultCoder() {
                        return coder2.getValueCoder();
                    }
                };
            }
        }), new BundleProgressHandler() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.9
            public void onProgress(BeamFnApi.ProcessBundleProgressResponse processBundleProgressResponse) {
            }

            public void onCompleted(BeamFnApi.ProcessBundleResponse processBundleResponse) {
                ArrayList arrayList = new ArrayList();
                for (BeamFnApi.MonitoringInfo monitoringInfo : processBundleResponse.getMonitoringInfosList()) {
                    BeamFnApi.MonitoringInfo.Builder newBuilder = BeamFnApi.MonitoringInfo.newBuilder();
                    Assert.assertTrue(monitoringInfo.getTimestamp().getSeconds() > 0);
                    newBuilder.mergeFrom(monitoringInfo);
                    newBuilder.clearTimestamp();
                    arrayList.add(newBuilder.build());
                }
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder.setUrnForUserMetric(RemoteExecutionTest.class.getName(), "counterMetric");
                simpleMonitoringInfoBuilder.setInt64Value(2L);
                Assert.assertThat(arrayList, CoreMatchers.hasItems(new BeamFnApi.MonitoringInfo[]{simpleMonitoringInfoBuilder.build()}));
            }
        });
        try {
            ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
            ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y")));
            if (newBundle != null) {
                $closeResource(null, newBundle);
            }
        } catch (Throwable th) {
            if (newBundle != null) {
                $closeResource(null, newBundle);
            }
            throw th;
        }
    }

    @Test
    public void testExecutionWithUserState() throws Exception {
        Pipeline create = Pipeline.create();
        create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.10
            @DoFn.ProcessElement
            public void process(DoFn<byte[], KV<String, String>>.ProcessContext processContext) {
            }
        })).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply("userState", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.11

            @DoFn.StateId("foo")
            private final StateSpec<BagState<String>> bufferState = StateSpecs.bag(StringUtf8Coder.of());

            @DoFn.StateId("foo2")
            private final StateSpec<BagState<String>> bufferState2 = StateSpecs.bag(StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element KV<String, String> kv, @DoFn.StateId("foo") BagState<String> bagState, @DoFn.StateId("foo2") BagState<String> bagState2, DoFn.OutputReceiver<KV<String, String>> outputReceiver) {
                bagState.isEmpty();
                Iterator it = bagState.read().iterator();
                while (it.hasNext()) {
                    outputReceiver.output(KV.of((String) kv.getKey(), (String) it.next()));
                }
                bagState.add((String) kv.getValue());
                bagState2.clear();
            }
        })).apply("gbk", GroupByKey.create());
        Optional tryFind = Iterables.tryFind(GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages(), executableStage -> {
            return !executableStage.getUserStates().isEmpty();
        });
        Preconditions.checkState(tryFind.isPresent(), "Expected a stage with user state.");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("test_stage", (ExecutableStage) tryFind.get(), this.dataServer.getApiServiceDescriptor(), this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations(), this.stateDelegator);
        Map outputTargetCoders = fromExecutableStage.getOutputTargetCoders();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : outputTargetCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            hashMap.put((BeamFnApi.Target) entry.getKey(), synchronizedList);
            BeamFnApi.Target target = (BeamFnApi.Target) entry.getKey();
            Coder coder = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap2.put(target, RemoteOutputReceiver.of(coder, (v1) -> {
                r3.add(v1);
            }));
        }
        final ImmutableMap of = ImmutableMap.of("foo", new ArrayList(Arrays.asList(ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A", Coder.Context.NESTED)), ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B", Coder.Context.NESTED)), ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C", Coder.Context.NESTED)))), "foo2", new ArrayList(Arrays.asList(ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "D", Coder.Context.NESTED)))));
        SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(hashMap2, StateRequestHandlers.forBagUserStateHandlerFactory(fromExecutableStage, new StateRequestHandlers.BagUserStateHandlerFactory() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.12
            public <K, V, W extends BoundedWindow> StateRequestHandlers.BagUserStateHandler<K, V, W> forUserState(String str, final String str2, Coder<K> coder2, Coder<V> coder3, Coder<W> coder4) {
                return (StateRequestHandlers.BagUserStateHandler<K, V, W>) new StateRequestHandlers.BagUserStateHandler<K, V, W>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.12.1
                    /* JADX WARN: Incorrect types in method signature: (TK;TW;)Ljava/lang/Iterable<TV;>; */
                    public Iterable get(Object obj, BoundedWindow boundedWindow) {
                        return (Iterable) of.get(str2);
                    }

                    /* JADX WARN: Incorrect types in method signature: (TK;TW;Ljava/util/Iterator<TV;>;)V */
                    public void append(Object obj, BoundedWindow boundedWindow, Iterator it) {
                        Iterators.addAll((Collection) of.get(str2), it);
                    }

                    /* JADX WARN: Incorrect types in method signature: (TK;TW;)V */
                    public void clear(Object obj, BoundedWindow boundedWindow) {
                        ((List) of.get(str2)).clear();
                    }
                };
            }
        }), BundleProgressHandler.ignored());
        Throwable th = null;
        try {
            try {
                ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(kvBytes("X", "Y")));
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                Iterator it = hashMap.values().iterator();
                while (it.hasNext()) {
                    Assert.assertThat((Collection) it.next(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(kvBytes("X", "A")), WindowedValue.valueInGlobalWindow(kvBytes("X", "B")), WindowedValue.valueInGlobalWindow(kvBytes("X", "C"))}));
                }
                Assert.assertThat((List) of.get("foo"), IsIterableContainingInOrder.contains(new ByteString[]{ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A", Coder.Context.NESTED)), ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B", Coder.Context.NESTED)), ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C", Coder.Context.NESTED)), ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y", Coder.Context.NESTED))}));
                Assert.assertThat((List) of.get("foo2"), IsEmptyIterable.emptyIterable());
            } finally {
            }
        } catch (Throwable th2) {
            if (newBundle != null) {
                $closeResource(th, newBundle);
            }
            throw th2;
        }
    }

    @Test
    public void testExecutionWithTimer() throws Exception {
        Pipeline create = Pipeline.create();
        create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.13
            @DoFn.ProcessElement
            public void process(DoFn<byte[], KV<String, String>>.ProcessContext processContext) {
            }
        })).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply("timer", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.14

            @DoFn.TimerId("event")
            private final TimerSpec eventTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

            @DoFn.TimerId("processing")
            private final TimerSpec processingTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, String>, KV<String, String>>.ProcessContext processContext, @DoFn.TimerId("event") Timer timer, @DoFn.TimerId("processing") Timer timer2) {
                processContext.output(KV.of("main" + ((String) ((KV) processContext.element()).getKey()), ""));
                timer.set(processContext.timestamp().plus(1L));
                timer2.offset(org.joda.time.Duration.millis(2L));
                timer2.setRelative();
            }

            @DoFn.OnTimer("event")
            public void eventTimer(DoFn<KV<String, String>, KV<String, String>>.OnTimerContext onTimerContext, @DoFn.TimerId("event") Timer timer, @DoFn.TimerId("processing") Timer timer2) {
                onTimerContext.output(KV.of("event", ""));
                timer.set(onTimerContext.timestamp().plus(11L));
                timer2.offset(org.joda.time.Duration.millis(12L));
                timer2.setRelative();
            }

            @DoFn.OnTimer("processing")
            public void processingTimer(DoFn<KV<String, String>, KV<String, String>>.OnTimerContext onTimerContext, @DoFn.TimerId("event") Timer timer, @DoFn.TimerId("processing") Timer timer2) {
                onTimerContext.output(KV.of("processing", ""));
                timer.set(onTimerContext.timestamp().plus(21L));
                timer2.offset(org.joda.time.Duration.millis(22L));
                timer2.setRelative();
            }
        })).apply("gbk", GroupByKey.create());
        Optional tryFind = Iterables.tryFind(GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages(), executableStage -> {
            return !executableStage.getTimers().isEmpty();
        });
        Preconditions.checkState(tryFind.isPresent(), "Expected a stage with timers.");
        ExecutableStage executableStage2 = (ExecutableStage) tryFind.get();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("test_stage", executableStage2, this.dataServer.getApiServiceDescriptor(), this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations(), this.stateDelegator);
        Map outputTargetCoders = fromExecutableStage.getOutputTargetCoders();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : outputTargetCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            hashMap.put((BeamFnApi.Target) entry.getKey(), synchronizedList);
            BeamFnApi.Target target = (BeamFnApi.Target) entry.getKey();
            Coder coder = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap2.put(target, RemoteOutputReceiver.of(coder, (v1) -> {
                r3.add(v1);
            }));
        }
        String str = null;
        BeamFnApi.Target target2 = null;
        String str2 = null;
        BeamFnApi.Target target3 = null;
        Iterator it = fromExecutableStage.getTimerSpecs().values().iterator();
        while (it.hasNext()) {
            for (ProcessBundleDescriptors.TimerSpec timerSpec : ((Map) it.next()).values()) {
                if (TimeDomain.EVENT_TIME.equals(timerSpec.getTimerSpec().getTimeDomain())) {
                    str = timerSpec.inputCollectionId();
                    target2 = timerSpec.outputTarget();
                } else if (TimeDomain.PROCESSING_TIME.equals(timerSpec.getTimerSpec().getTimeDomain())) {
                    str2 = timerSpec.inputCollectionId();
                    target3 = timerSpec.outputTarget();
                } else {
                    Assert.fail(String.format("Unknown timer specification %s", timerSpec));
                }
            }
        }
        DateTimeUtils.setCurrentMillisFixed(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
        SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(hashMap2, StateRequestHandler.unsupported(), BundleProgressHandler.ignored());
        Throwable th = null;
        try {
            try {
                ((FnDataReceiver) newBundle.getInputReceivers().get(executableStage2.getInputPCollection().getId())).accept(WindowedValue.valueInGlobalWindow(kvBytes("X", "X")));
                ((FnDataReceiver) newBundle.getInputReceivers().get(str)).accept(WindowedValue.valueInGlobalWindow(timerBytes("Y", 100L)));
                ((FnDataReceiver) newBundle.getInputReceivers().get(str2)).accept(WindowedValue.valueInGlobalWindow(timerBytes("Z", 200L)));
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                Assert.assertThat((Collection) hashMap.get((BeamFnApi.Target) Iterables.getOnlyElement(Sets.difference(fromExecutableStage.getOutputTargetCoders().keySet(), ImmutableSet.of(target2, target3)))), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(kvBytes("mainX", "")), WindowedValue.valueInGlobalWindow(kvBytes("event", "")), WindowedValue.valueInGlobalWindow(kvBytes("processing", ""))}));
                Assert.assertThat(timerStructuralValues((Collection) hashMap.get(target2)), Matchers.containsInAnyOrder(new Object[]{timerStructuralValue(WindowedValue.valueInGlobalWindow(timerBytes("X", 1L))), timerStructuralValue(WindowedValue.valueInGlobalWindow(timerBytes("Y", 11L))), timerStructuralValue(WindowedValue.valueInGlobalWindow(timerBytes("Z", 21L)))}));
                Assert.assertThat(timerStructuralValues((Collection) hashMap.get(target3)), Matchers.containsInAnyOrder(new Object[]{timerStructuralValue(WindowedValue.valueInGlobalWindow(timerBytes("X", 2L))), timerStructuralValue(WindowedValue.valueInGlobalWindow(timerBytes("Y", 12L))), timerStructuralValue(WindowedValue.valueInGlobalWindow(timerBytes("Z", 22L)))}));
            } finally {
            }
        } catch (Throwable th2) {
            if (newBundle != null) {
                $closeResource(th, newBundle);
            }
            throw th2;
        }
    }

    @Test
    public void testExecutionWithMultipleStages() throws Exception {
        Pipeline create = Pipeline.create();
        Function function = str -> {
            return create.apply("impulse" + str, Impulse.create()).apply("create" + str, ParDo.of(new DoFn<byte[], String>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.15
                @DoFn.ProcessElement
                public void process(DoFn<byte[], String>.ProcessContext processContext) {
                    try {
                        processContext.output((String) CoderUtils.decodeFromByteArray(StringUtf8Coder.of(), (byte[]) processContext.element()));
                    } catch (CoderException e) {
                        throw new RuntimeException((Throwable) e);
                    }
                }
            })).setCoder(StringUtf8Coder.of()).apply(ParDo.of(new DoFn<String, String>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.16
                @DoFn.ProcessElement
                public void processElement(DoFn<String, String>.ProcessContext processContext) {
                    processContext.output("stream" + str + ((String) processContext.element()));
                }
            }));
        };
        PCollectionList.of((PCollection) function.apply("1")).and((PCollection) function.apply("2")).apply(Flatten.pCollections()).apply("createKV", ParDo.of(new DoFn<String, KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.17
            @DoFn.ProcessElement
            public void process(DoFn<String, KV<String, String>>.ProcessContext processContext) {
                processContext.output(KV.of((String) processContext.element(), ""));
            }
        })).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply("gbk", GroupByKey.create());
        Set<ExecutableStage> fusedStages = GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages();
        Assert.assertThat(Integer.valueOf(fusedStages.size()), Matchers.equalTo(2));
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        for (ExecutableStage executableStage : fusedStages) {
            ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage(executableStage.toString(), executableStage, this.dataServer.getApiServiceDescriptor(), this.stateServer.getApiServiceDescriptor());
            SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations(), this.stateDelegator);
            Map outputTargetCoders = fromExecutableStage.getOutputTargetCoders();
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : outputTargetCoders.entrySet()) {
                BeamFnApi.Target target = (BeamFnApi.Target) entry.getKey();
                Coder coder = (Coder) entry.getValue();
                Objects.requireNonNull(synchronizedList);
                hashMap.putIfAbsent(target, RemoteOutputReceiver.of(coder, (v1) -> {
                    r3.add(v1);
                }));
            }
            SdkHarnessClient.ActiveBundle newBundle = processor.newBundle(hashMap, StateRequestHandler.unsupported(), BundleProgressHandler.ignored());
            Throwable th = null;
            try {
                try {
                    ((FnDataReceiver) newBundle.getInputReceivers().get(executableStage.getInputPCollection().getId())).accept(WindowedValue.valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
                    if (newBundle != null) {
                        $closeResource(null, newBundle);
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (newBundle != null) {
                    $closeResource(th, newBundle);
                }
                throw th2;
            }
        }
        Assert.assertThat(synchronizedList, Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(kvBytes("stream1X", "")), WindowedValue.valueInGlobalWindow(kvBytes("stream2X", ""))}));
    }

    private KV<byte[], byte[]> kvBytes(String str, long j) throws CoderException {
        return KV.of(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), str), CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), Long.valueOf(j)));
    }

    private KV<byte[], byte[]> kvBytes(String str, String str2) throws CoderException {
        return KV.of(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), str), CoderUtils.encodeToByteArray(StringUtf8Coder.of(), str2));
    }

    private KV<byte[], org.apache.beam.runners.core.construction.Timer<byte[]>> timerBytes(String str, long j) throws CoderException {
        return KV.of(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), str), org.apache.beam.runners.core.construction.Timer.of(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(j), CoderUtils.encodeToByteArray(VoidCoder.of(), (Object) null, Coder.Context.NESTED)));
    }

    private Object timerStructuralValue(Object obj) {
        return WindowedValue.FullWindowedValueCoder.of(KvCoder.of(ByteArrayCoder.of(), Timer.Coder.of(ByteArrayCoder.of())), GlobalWindow.Coder.INSTANCE).structuralValue(obj);
    }

    private Collection<Object> timerStructuralValues(Collection<?> collection) {
        return Collections2.transform(collection, this::timerStructuralValue);
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
