/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.control;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.FusedPipeline;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoMatchers;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.HeaderAccessor;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver;
import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.logging.LogWriter;
import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Collections2;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsEmptyIterable;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class RemoteExecutionTest
implements Serializable {
    @Rule
    public transient ResetDateTimeProvider resetDateTimeProvider = new ResetDateTimeProvider();
    private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutionTest.class);
    private transient GrpcFnServer<FnApiControlClientPoolService> controlServer;
    private transient GrpcFnServer<GrpcDataService> dataServer;
    private transient GrpcFnServer<GrpcStateService> stateServer;
    private transient GrpcFnServer<GrpcLoggingService> loggingServer;
    private transient GrpcStateService stateDelegator;
    private transient SdkHarnessClient controlClient;
    private transient ExecutorService serverExecutor;
    private transient ExecutorService sdkHarnessExecutor;
    private transient Future<?> sdkHarnessExecutorFuture;

    @Before
    public void setup() throws Exception {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
        this.serverExecutor = Executors.newCachedThreadPool(threadFactory);
        InProcessServerFactory serverFactory = InProcessServerFactory.create();
        this.dataServer = GrpcFnServer.allocatePortAndCreateFor((FnService)GrpcDataService.create((ExecutorService)this.serverExecutor, (OutboundObserverFactory)OutboundObserverFactory.serverDirect()), (ServerFactory)serverFactory);
        this.loggingServer = GrpcFnServer.allocatePortAndCreateFor((FnService)GrpcLoggingService.forWriter((LogWriter)Slf4jLogWriter.getDefault()), (ServerFactory)serverFactory);
        this.stateDelegator = GrpcStateService.create();
        this.stateServer = GrpcFnServer.allocatePortAndCreateFor((FnService)this.stateDelegator, (ServerFactory)serverFactory);
        MapControlClientPool clientPool = MapControlClientPool.create();
        this.controlServer = GrpcFnServer.allocatePortAndCreateFor((FnService)FnApiControlClientPoolService.offeringClientsToPool((ControlClientPool.Sink)clientPool.getSink(), (HeaderAccessor)GrpcContextHeaderAccessorProvider.getHeaderAccessor()), (ServerFactory)serverFactory);
        this.sdkHarnessExecutor = Executors.newSingleThreadExecutor(threadFactory);
        this.sdkHarnessExecutorFuture = this.sdkHarnessExecutor.submit(() -> {
            try {
                FnHarness.main((String)"id", (PipelineOptions)PipelineOptionsFactory.create(), (Endpoints.ApiServiceDescriptor)this.loggingServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.controlServer.getApiServiceDescriptor(), (ManagedChannelFactory)InProcessManagedChannelFactory.create(), (OutboundObserverFactory)OutboundObserverFactory.clientDirect());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        InstructionRequestHandler controlClient = clientPool.getSource().take("", java.time.Duration.ofSeconds(2L));
        this.controlClient = SdkHarnessClient.usingFnApiClient((InstructionRequestHandler)controlClient, (FnDataService)((FnDataService)this.dataServer.getService()));
    }

    @After
    public void tearDown() throws Exception {
        block2: {
            this.controlServer.close();
            this.stateServer.close();
            this.dataServer.close();
            this.loggingServer.close();
            this.controlClient.close();
            this.sdkHarnessExecutor.shutdownNow();
            this.serverExecutor.shutdownNow();
            try {
                this.sdkHarnessExecutorFuture.get();
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof RuntimeException && e.getCause().getCause() instanceof InterruptedException) break block2;
                throw e;
            }
        }
    }

    @Test
    public void testExecution() throws Exception {
        Pipeline p = Pipeline.create();
        ((PCollection)((PCollection)((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], String>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
                ctxt.output((Object)"zero");
                ctxt.output((Object)"one");
                ctxt.output((Object)"two");
            }
        }))).apply("len", (PTransform)ParDo.of((DoFn)new DoFn<String, Long>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
                ctxt.output((Object)((String)ctxt.element()).length());
            }
        }))).apply("addKeys", (PTransform)WithKeys.of((Object)"foo"))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)BigEndianLongCoder.of())).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Preconditions.checkState((fused.getFusedStages().size() == 1 ? 1 : 0) != 0, (Object)"Expected exactly one fused stage");
        ExecutableStage stage = (ExecutableStage)fused.getFusedStages().iterator().next();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"my_stage", (ExecutableStage)stage, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations());
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap outputValues = new HashMap();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry entry : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputValues.put((String)entry.getKey(), outputContents);
            outputReceivers.put((String)entry.getKey(), RemoteOutputReceiver.of((Coder)((Coder)entry.getValue()), outputContents::add));
        }
        Throwable throwable = null;
        try (SdkHarnessClient.ActiveBundle bundle = processor.newBundle(outputReceivers, BundleProgressHandler.ignored());){
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)new byte[0]));
        }
        catch (Throwable throwable2) {
            Throwable throwable3 = throwable2;
            throw throwable2;
        }
        for (Collection collection : outputValues.values()) {
            Assert.assertThat((Object)collection, (Matcher)Matchers.containsInAnyOrder((Object[])new Object[]{WindowedValue.valueInGlobalWindow(this.byteValueOf("foo", 4L)), WindowedValue.valueInGlobalWindow(this.byteValueOf("foo", 3L)), WindowedValue.valueInGlobalWindow(this.byteValueOf("foo", 3L))}));
        }
    }

    @Test
    public void testBundleProcessorThrowsExecutionExceptionWhenUserCodeThrows() throws Exception {
        Pipeline p = Pipeline.create();
        ((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], KV<String, String>>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) throws Exception {
                String element = (String)CoderUtils.decodeFromByteArray((Coder)StringUtf8Coder.of(), (byte[])((byte[])ctxt.element()));
                if (element.equals("X")) {
                    throw new Exception("testBundleExecutionFailure");
                }
                ctxt.output((Object)KV.of((Object)element, (Object)element));
            }
        }))).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Preconditions.checkState((fused.getFusedStages().size() == 1 ? 1 : 0) != 0, (Object)"Expected exactly one fused stage");
        ExecutableStage stage = (ExecutableStage)fused.getFusedStages().iterator().next();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"my_stage", (ExecutableStage)stage, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations());
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap outputValues = new HashMap();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry entry : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputValues.put((String)entry.getKey(), outputContents);
            outputReceivers.put((String)entry.getKey(), RemoteOutputReceiver.of((Coder)((Coder)entry.getValue()), outputContents::add));
        }
        Throwable throwable = null;
        try (SdkHarnessClient.ActiveBundle bundle = processor.newBundle(outputReceivers, BundleProgressHandler.ignored());){
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"Y")));
        }
        catch (Throwable throwable2) {
            Throwable throwable3 = throwable2;
            throw throwable2;
        }
        try {
            bundle = processor.newBundle(outputReceivers, BundleProgressHandler.ignored());
            Throwable throwable4 = null;
            try {
                ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"X")));
            }
            catch (Throwable throwable5) {
                Throwable throwable6 = throwable5;
                throw throwable5;
            }
            finally {
                if (bundle != null) {
                    RemoteExecutionTest.$closeResource(throwable4, (AutoCloseable)bundle);
                }
            }
            Assert.fail();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("testBundleExecutionFailure"));
        }
        bundle = processor.newBundle(outputReceivers, BundleProgressHandler.ignored());
        Throwable throwable7 = null;
        try {
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"Z")));
        }
        catch (Throwable throwable8) {
            Throwable throwable9 = throwable8;
            throw throwable8;
        }
        finally {
            if (bundle != null) {
                RemoteExecutionTest.$closeResource(throwable7, (AutoCloseable)bundle);
            }
        }
        for (Collection collection : outputValues.values()) {
            Assert.assertThat((Object)collection, (Matcher)Matchers.containsInAnyOrder((Object[])new Object[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"Y")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Z", (Object)"Z"))}));
        }
    }

    @Test
    public void testExecutionWithSideInput() throws Exception {
        Pipeline p = Pipeline.create();
        PCollection input = ((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], String>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
                ctxt.output((Object)"zero");
                ctxt.output((Object)"one");
                ctxt.output((Object)"two");
            }
        }))).setCoder((Coder)StringUtf8Coder.of());
        final PCollectionView view = (PCollectionView)input.apply("createSideInput", (PTransform)View.asIterable());
        ((PCollection)input.apply("readSideInput", (PTransform)ParDo.of((DoFn)new DoFn<String, KV<String, String>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) {
                for (String value : (Iterable)context.sideInput(view)) {
                    context.output((Object)KV.of((Object)((String)context.element()), (Object)value));
                }
            }
        }).withSideInputs(new PCollectionView[]{view}))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Optional optionalStage = Iterables.tryFind((Iterable)fused.getFusedStages(), stage -> !stage.getSideInputs().isEmpty());
        Preconditions.checkState((boolean)optionalStage.isPresent(), (Object)"Expected a stage with side inputs.");
        ExecutableStage stage2 = (ExecutableStage)optionalStage.get();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"test_stage", (ExecutableStage)stage2, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations(), (StateDelegator)this.stateDelegator);
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap outputValues = new HashMap();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputValues.put((String)remoteOutputCoder.getKey(), outputContents);
            outputReceivers.put((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputContents::add));
        }
        final List<String> sideInputData = Arrays.asList("A", "B", "C");
        StateRequestHandler stateRequestHandler = StateRequestHandlers.forSideInputHandlerFactory((Map)descriptor.getSideInputSpecs(), (StateRequestHandlers.SideInputHandlerFactory)new StateRequestHandlers.SideInputHandlerFactory(){

            public <T, V, W extends BoundedWindow> StateRequestHandlers.SideInputHandler<V, W> forSideInput(String pTransformId, String sideInputId, RunnerApi.FunctionSpec accessPattern, final Coder<T> elementCoder, Coder<W> windowCoder) {
                return new StateRequestHandlers.SideInputHandler<V, W>(){

                    public Iterable<V> get(byte[] key, W window) {
                        return sideInputData;
                    }

                    public Coder<V> resultCoder() {
                        return ((KvCoder)elementCoder).getValueCoder();
                    }
                };
            }
        });
        BundleProgressHandler progressHandler = BundleProgressHandler.ignored();
        try (SdkHarnessClient.ActiveBundle bundle = processor.newBundle(outputReceivers, stateRequestHandler, progressHandler);){
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)"X"));
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)"Y"));
        }
        for (Collection windowedValues : outputValues.values()) {
            Assert.assertThat((Object)windowedValues, (Matcher)Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"A")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"B")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"C")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"A")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"B")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"Y", (Object)"C"))}));
        }
    }

    @Test
    public void testMetrics() throws Exception {
        String processUserCounterName = "processUserCounter";
        String startUserCounterName = "startUserCounter";
        String finishUserCounterName = "finishUserCounter";
        String processUserDistributionName = "processUserDistribution";
        String startUserDistributionName = "startUserDistribution";
        String finishUserDistributionName = "finishUserDistribution";
        Pipeline p = Pipeline.create();
        PCollection input = ((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], String>(){
            private boolean emitted = false;
            private Counter startCounter = Metrics.counter(RemoteExecutionTest.class, (String)"startUserCounter");

            @DoFn.StartBundle
            public void startBundle() throws InterruptedException {
                Thread.sleep(1000L);
                this.startCounter.inc(10L);
                Metrics.distribution(RemoteExecutionTest.class, (String)"startUserDistribution").update(10L);
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext ctxt) throws InterruptedException {
                if (!this.emitted) {
                    ctxt.output((Object)"zero");
                    ctxt.output((Object)"one");
                    ctxt.output((Object)"two");
                    Thread.sleep(1000L);
                    Metrics.counter(RemoteExecutionTest.class, (String)"processUserCounter").inc();
                    Metrics.distribution(RemoteExecutionTest.class, (String)"processUserDistribution").update(1L);
                }
                this.emitted = true;
            }

            @DoFn.FinishBundle
            public void finishBundle() throws InterruptedException {
                Thread.sleep(1000L);
                Metrics.counter(RemoteExecutionTest.class, (String)"finishUserCounter").inc(100L);
                Metrics.distribution(RemoteExecutionTest.class, (String)"finishUserDistribution").update(100L);
            }
        }))).setCoder((Coder)StringUtf8Coder.of());
        ParDo.SingleOutput pardo = ParDo.of((DoFn)new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
                ctxt.output((Object)((String)ctxt.element()));
                ctxt.output((Object)((String)ctxt.element()));
            }
        });
        ((PCollection)input.apply("processA", (PTransform)pardo)).setCoder((Coder)StringUtf8Coder.of());
        ((PCollection)input.apply("processB", (PTransform)pardo)).setCoder((Coder)StringUtf8Coder.of());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Optional optionalStage = Iterables.tryFind((Iterable)fused.getFusedStages(), stage -> true);
        Preconditions.checkState((boolean)optionalStage.isPresent(), (Object)"Expected a stage with side inputs.");
        ExecutableStage stage2 = (ExecutableStage)optionalStage.get();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"test_stage", (ExecutableStage)stage2, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations(), (StateDelegator)this.stateDelegator);
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap outputValues = new HashMap();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputValues.put((String)remoteOutputCoder.getKey(), outputContents);
            outputReceivers.put((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputContents::add));
        }
        final List<String> sideInputData = Arrays.asList("A", "B", "C");
        StateRequestHandler stateRequestHandler = StateRequestHandlers.forSideInputHandlerFactory((Map)descriptor.getSideInputSpecs(), (StateRequestHandlers.SideInputHandlerFactory)new StateRequestHandlers.SideInputHandlerFactory(){

            public <T, V, W extends BoundedWindow> StateRequestHandlers.SideInputHandler<V, W> forSideInput(String pTransformId, String sideInputId, RunnerApi.FunctionSpec accessPattern, final Coder<T> elementCoder, Coder<W> windowCoder) {
                return new StateRequestHandlers.SideInputHandler<V, W>(){

                    public Iterable<V> get(byte[] key, W window) {
                        return sideInputData;
                    }

                    public Coder<V> resultCoder() {
                        return ((KvCoder)elementCoder).getValueCoder();
                    }
                };
            }
        });
        final String testPTransformId = "create/ParMultiDo(Anonymous)";
        BundleProgressHandler progressHandler = new BundleProgressHandler(){

            public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {
            }

            public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
                ArrayList<Object> matchers = new ArrayList<Object>();
                SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_COUNTER).setLabel(MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName()).setLabel(MonitoringInfoConstants.Labels.NAME, "processUserCounter");
                builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "create/ParMultiDo(Anonymous)");
                builder.setInt64Value(1L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_COUNTER).setLabel(MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName()).setLabel(MonitoringInfoConstants.Labels.NAME, "startUserCounter");
                builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "create/ParMultiDo(Anonymous)");
                builder.setInt64Value(10L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_COUNTER).setLabel(MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName()).setLabel(MonitoringInfoConstants.Labels.NAME, "finishUserCounter");
                builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "create/ParMultiDo(Anonymous)");
                builder.setInt64Value(100L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER).setLabel(MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName()).setLabel(MonitoringInfoConstants.Labels.NAME, "processUserDistribution");
                builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "create/ParMultiDo(Anonymous)");
                builder.setInt64DistributionValue(DistributionData.create((long)1L, (long)1L, (long)1L, (long)1L));
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER).setLabel(MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName()).setLabel(MonitoringInfoConstants.Labels.NAME, "startUserDistribution");
                builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "create/ParMultiDo(Anonymous)");
                builder.setInt64DistributionValue(DistributionData.create((long)10L, (long)1L, (long)10L, (long)10L));
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_COUNTER).setLabel(MonitoringInfoConstants.Labels.NAMESPACE, RemoteExecutionTest.class.getName()).setLabel(MonitoringInfoConstants.Labels.NAME, "finishUserDistribution");
                builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "create/ParMultiDo(Anonymous)");
                builder.setInt64DistributionValue(DistributionData.create((long)100L, (long)1L, (long)100L, (long)100L));
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
                builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "impulse.out");
                builder.setInt64Value(2L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
                builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "create/ParMultiDo(Anonymous).output");
                builder.setInt64Value(3L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
                builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "processA/ParMultiDo(Anonymous).output");
                builder.setInt64Value(6L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
                builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "processB/ParMultiDo(Anonymous).output");
                builder.setInt64Value(6L);
                matchers.add(MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.START_BUNDLE_MSECS);
                builder.setInt64TypeUrn();
                builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
                matchers.add(Matchers.allOf((Matcher)MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()), (Matcher)MonitoringInfoMatchers.valueGreaterThan((long)0L)));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.PROCESS_BUNDLE_MSECS);
                builder.setInt64TypeUrn();
                builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
                matchers.add(Matchers.allOf((Matcher)MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()), (Matcher)MonitoringInfoMatchers.valueGreaterThan((long)0L)));
                builder = new SimpleMonitoringInfoBuilder();
                builder.setUrn(MonitoringInfoConstants.Urns.FINISH_BUNDLE_MSECS);
                builder.setInt64TypeUrn();
                builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, testPTransformId);
                matchers.add(Matchers.allOf((Matcher)MonitoringInfoMatchers.matchSetFields((MetricsApi.MonitoringInfo)builder.build()), (Matcher)MonitoringInfoMatchers.valueGreaterThan((long)0L)));
                for (Matcher matcher : matchers) {
                    Assert.assertThat((Object)response.getMonitoringInfosList(), (Matcher)Matchers.hasItem((Matcher)matcher));
                }
            }
        };
        try (SdkHarnessClient.ActiveBundle bundle = processor.newBundle(outputReceivers, stateRequestHandler, progressHandler);){
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"X")));
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"Y")));
        }
    }

    @Test
    public void testExecutionWithUserState() throws Exception {
        Pipeline p = Pipeline.create();
        String stateId = "foo";
        String stateId2 = "foo2";
        ((PCollection)((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], KV<String, String>>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
            }
        }))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).apply("userState", (PTransform)ParDo.of((DoFn)new DoFn<KV<String, String>, KV<String, String>>(){
            @DoFn.StateId(value="foo")
            private final StateSpec<BagState<String>> bufferState = StateSpecs.bag((Coder)StringUtf8Coder.of());
            @DoFn.StateId(value="foo2")
            private final StateSpec<BagState<String>> bufferState2 = StateSpecs.bag((Coder)StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element KV<String, String> element, @DoFn.StateId(value="foo") BagState<String> state, @DoFn.StateId(value="foo2") BagState<String> state2, DoFn.OutputReceiver<KV<String, String>> r) {
                ReadableState isEmpty = state.isEmpty();
                for (String value : state.read()) {
                    r.output((Object)KV.of((Object)((String)element.getKey()), (Object)value));
                }
                state.add((Object)((String)element.getValue()));
                state2.clear();
            }
        }))).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Optional optionalStage = Iterables.tryFind((Iterable)fused.getFusedStages(), stage -> !stage.getUserStates().isEmpty());
        Preconditions.checkState((boolean)optionalStage.isPresent(), (Object)"Expected a stage with user state.");
        ExecutableStage stage2 = (ExecutableStage)optionalStage.get();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"test_stage", (ExecutableStage)stage2, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations(), (StateDelegator)this.stateDelegator);
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap outputValues = new HashMap();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputValues.put((String)remoteOutputCoder.getKey(), outputContents);
            outputReceivers.put((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputContents::add));
        }
        ImmutableMap userStateData = ImmutableMap.of((Object)"foo", new ArrayList<ByteString>(Arrays.asList(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"A", (Coder.Context)Coder.Context.NESTED)), ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"B", (Coder.Context)Coder.Context.NESTED)), ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"C", (Coder.Context)Coder.Context.NESTED)))), (Object)"foo2", new ArrayList<ByteString>(Arrays.asList(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"D", (Coder.Context)Coder.Context.NESTED)))));
        StateRequestHandler stateRequestHandler = StateRequestHandlers.forBagUserStateHandlerFactory((ProcessBundleDescriptors.ExecutableProcessBundleDescriptor)descriptor, (StateRequestHandlers.BagUserStateHandlerFactory)new StateRequestHandlers.BagUserStateHandlerFactory<ByteString, Object, BoundedWindow>((Map)userStateData){
            final /* synthetic */ Map val$userStateData;
            {
                this.val$userStateData = map;
            }

            public StateRequestHandlers.BagUserStateHandler<ByteString, Object, BoundedWindow> forUserState(String pTransformId, final String userStateId, Coder<ByteString> keyCoder, Coder<Object> valueCoder, Coder<BoundedWindow> windowCoder) {
                return new StateRequestHandlers.BagUserStateHandler<ByteString, Object, BoundedWindow>(){

                    public Iterable<Object> get(ByteString key, BoundedWindow window) {
                        return (Iterable)val$userStateData.get(userStateId);
                    }

                    public void append(ByteString key, BoundedWindow window, Iterator<Object> values) {
                        Iterators.addAll((Collection)((Collection)val$userStateData.get(userStateId)), values);
                    }

                    public void clear(ByteString key, BoundedWindow window) {
                        ((List)val$userStateData.get(userStateId)).clear();
                    }
                };
            }
        });
        try (SdkHarnessClient.ActiveBundle bundle = processor.newBundle(outputReceivers, stateRequestHandler, BundleProgressHandler.ignored());){
            ((FnDataReceiver)Iterables.getOnlyElement(bundle.getInputReceivers().values())).accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"Y")));
        }
        for (Collection windowedValues : outputValues.values()) {
            Assert.assertThat((Object)windowedValues, (Matcher)Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"A")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"B")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"C"))}));
        }
        Assert.assertThat((Object)((List)userStateData.get("foo")), (Matcher)IsIterableContainingInOrder.contains((Object[])new ByteString[]{ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"A", (Coder.Context)Coder.Context.NESTED)), ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"B", (Coder.Context)Coder.Context.NESTED)), ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"C", (Coder.Context)Coder.Context.NESTED)), ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"Y", (Coder.Context)Coder.Context.NESTED))}));
        Assert.assertThat((Object)((List)userStateData.get("foo2")), (Matcher)IsEmptyIterable.emptyIterable());
    }

    @Test
    public void testExecutionWithTimer() throws Exception {
        Object timerSpecs2;
        Pipeline p = Pipeline.create();
        String timerId = "foo";
        String timerId2 = "foo2";
        ((PCollection)((PCollection)((PCollection)p.apply("impulse", (PTransform)Impulse.create())).apply("create", (PTransform)ParDo.of((DoFn)new DoFn<byte[], KV<String, String>>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext ctxt) {
            }
        }))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).apply("timer", (PTransform)ParDo.of((DoFn)new DoFn<KV<String, String>, KV<String, String>>(){
            @DoFn.TimerId(value="event")
            private final TimerSpec eventTimerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);
            @DoFn.TimerId(value="processing")
            private final TimerSpec processingTimerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.PROCESSING_TIME);

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context, @DoFn.TimerId(value="event") org.apache.beam.sdk.state.Timer eventTimeTimer, @DoFn.TimerId(value="processing") org.apache.beam.sdk.state.Timer processingTimeTimer) {
                context.output((Object)KV.of((Object)("main" + (String)((KV)context.element()).getKey()), (Object)""));
                eventTimeTimer.set(context.timestamp().plus(1L));
                processingTimeTimer.offset(Duration.millis((long)2L));
                processingTimeTimer.setRelative();
            }

            @DoFn.OnTimer(value="event")
            public void eventTimer(DoFn.OnTimerContext context, @DoFn.TimerId(value="event") org.apache.beam.sdk.state.Timer eventTimeTimer, @DoFn.TimerId(value="processing") org.apache.beam.sdk.state.Timer processingTimeTimer) {
                context.output((Object)KV.of((Object)"event", (Object)""));
                eventTimeTimer.set(context.timestamp().plus(11L));
                processingTimeTimer.offset(Duration.millis((long)12L));
                processingTimeTimer.setRelative();
            }

            @DoFn.OnTimer(value="processing")
            public void processingTimer(DoFn.OnTimerContext context, @DoFn.TimerId(value="event") org.apache.beam.sdk.state.Timer eventTimeTimer, @DoFn.TimerId(value="processing") org.apache.beam.sdk.state.Timer processingTimeTimer) {
                context.output((Object)KV.of((Object)"processing", (Object)""));
                eventTimeTimer.set(context.timestamp().plus(21L));
                processingTimeTimer.offset(Duration.millis((long)22L));
                processingTimeTimer.setRelative();
            }
        }))).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Optional optionalStage = Iterables.tryFind((Iterable)fused.getFusedStages(), stage -> !stage.getTimers().isEmpty());
        Preconditions.checkState((boolean)optionalStage.isPresent(), (Object)"Expected a stage with timers.");
        ExecutableStage stage2 = (ExecutableStage)optionalStage.get();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)"test_stage", (ExecutableStage)stage2, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations(), (StateDelegator)this.stateDelegator);
        Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
        HashMap outputValues = new HashMap();
        HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
        for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
            List outputContents = Collections.synchronizedList(new ArrayList());
            outputValues.put((String)remoteOutputCoder.getKey(), outputContents);
            outputReceivers.put((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputContents::add));
        }
        String eventTimeInputPCollectionId = null;
        String eventTimeOutputTransformId = null;
        String processingTimeInputPCollectionId = null;
        String processingTimeOutputTransformId = null;
        for (Object timerSpecs2 : descriptor.getTimerSpecs().values()) {
            for (ProcessBundleDescriptors.TimerSpec timerSpec : timerSpecs2.values()) {
                if (TimeDomain.EVENT_TIME.equals((Object)timerSpec.getTimerSpec().getTimeDomain())) {
                    eventTimeInputPCollectionId = timerSpec.inputCollectionId();
                    eventTimeOutputTransformId = timerSpec.outputTransformId();
                    continue;
                }
                if (TimeDomain.PROCESSING_TIME.equals((Object)timerSpec.getTimerSpec().getTimeDomain())) {
                    processingTimeInputPCollectionId = timerSpec.inputCollectionId();
                    processingTimeOutputTransformId = timerSpec.outputTransformId();
                    continue;
                }
                Assert.fail((String)String.format("Unknown timer specification %s", timerSpec));
            }
        }
        DateTimeUtils.setCurrentMillisFixed((long)BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
        timerSpecs2 = null;
        try (SdkHarnessClient.ActiveBundle bundle = processor.newBundle(outputReceivers, StateRequestHandler.unsupported(), BundleProgressHandler.ignored());){
            ((FnDataReceiver)bundle.getInputReceivers().get(stage2.getInputPCollection().getId())).accept((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"X", (Object)"X")));
            ((FnDataReceiver)bundle.getInputReceivers().get(eventTimeInputPCollectionId)).accept((Object)WindowedValue.valueInGlobalWindow(this.timerBytes("Y", 100L)));
            ((FnDataReceiver)bundle.getInputReceivers().get(processingTimeInputPCollectionId)).accept((Object)WindowedValue.valueInGlobalWindow(this.timerBytes("Z", 200L)));
        }
        catch (Throwable throwable) {
            timerSpecs2 = throwable;
            throw throwable;
        }
        ImmutableSet timerOutputCoders = ImmutableSet.of((Object)eventTimeOutputTransformId, (Object)processingTimeOutputTransformId);
        String mainOutputTransform = (String)Iterables.getOnlyElement((Iterable)Sets.difference(descriptor.getRemoteOutputCoders().keySet(), (Set)timerOutputCoders));
        Assert.assertThat((Object)((Collection)outputValues.get(mainOutputTransform)), (Matcher)Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"mainX", (Object)"")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"event", (Object)"")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"processing", (Object)""))}));
        Assert.assertThat(this.timerStructuralValues((Collection)outputValues.get(eventTimeOutputTransformId)), (Matcher)Matchers.containsInAnyOrder((Object[])new Object[]{this.timerStructuralValue(WindowedValue.valueInGlobalWindow(this.timerBytes("X", 1L))), this.timerStructuralValue(WindowedValue.valueInGlobalWindow(this.timerBytes("Y", 11L))), this.timerStructuralValue(WindowedValue.valueInGlobalWindow(this.timerBytes("Z", 21L)))}));
        Assert.assertThat(this.timerStructuralValues((Collection)outputValues.get(processingTimeOutputTransformId)), (Matcher)Matchers.containsInAnyOrder((Object[])new Object[]{this.timerStructuralValue(WindowedValue.valueInGlobalWindow(this.timerBytes("X", 2L))), this.timerStructuralValue(WindowedValue.valueInGlobalWindow(this.timerBytes("Y", 12L))), this.timerStructuralValue(WindowedValue.valueInGlobalWindow(this.timerBytes("Z", 22L)))}));
    }

    @Test
    public void testExecutionWithMultipleStages() throws Exception {
        Pipeline p = Pipeline.create();
        Function<String, PCollection> pCollectionGenerator = suffix -> (PCollection)((PCollection)((PCollection)p.apply("impulse" + suffix, (PTransform)Impulse.create())).apply("create" + suffix, (PTransform)ParDo.of((DoFn)new DoFn<byte[], String>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext c) {
                try {
                    c.output((Object)((String)CoderUtils.decodeFromByteArray((Coder)StringUtf8Coder.of(), (byte[])((byte[])c.element()))));
                }
                catch (CoderException e) {
                    throw new RuntimeException(e);
                }
            }
        }))).setCoder((Coder)StringUtf8Coder.of()).apply((PTransform)ParDo.of((DoFn)new DoFn<String, String>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                c.output((Object)("stream" + suffix + (String)c.element()));
            }
        }));
        PCollection input1 = pCollectionGenerator.apply("1");
        PCollection input2 = pCollectionGenerator.apply("2");
        PCollection outputMerged = (PCollection)PCollectionList.of((PCollection)input1).and(input2).apply((PTransform)Flatten.pCollections());
        ((PCollection)outputMerged.apply("createKV", (PTransform)ParDo.of((DoFn)new DoFn<String, KV<String, String>>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext c) {
                c.output((Object)KV.of((Object)((String)c.element()), (Object)""));
            }
        }))).setCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of())).apply("gbk", (PTransform)GroupByKey.create());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)p);
        FusedPipeline fused = GreedyPipelineFuser.fuse((RunnerApi.Pipeline)pipelineProto);
        Set stages = fused.getFusedStages();
        Assert.assertThat((Object)stages.size(), (Matcher)Matchers.equalTo((Object)2));
        List outputValues = Collections.synchronizedList(new ArrayList());
        for (ExecutableStage stage : stages) {
            ProcessBundleDescriptors.ExecutableProcessBundleDescriptor descriptor = ProcessBundleDescriptors.fromExecutableStage((String)stage.toString(), (ExecutableStage)stage, (Endpoints.ApiServiceDescriptor)this.dataServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor)this.stateServer.getApiServiceDescriptor());
            SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations(), (StateDelegator)this.stateDelegator);
            Map remoteOutputCoders = descriptor.getRemoteOutputCoders();
            HashMap<String, RemoteOutputReceiver> outputReceivers = new HashMap<String, RemoteOutputReceiver>();
            for (Map.Entry remoteOutputCoder : remoteOutputCoders.entrySet()) {
                outputReceivers.putIfAbsent((String)remoteOutputCoder.getKey(), RemoteOutputReceiver.of((Coder)((Coder)remoteOutputCoder.getValue()), outputValues::add));
            }
            SdkHarnessClient.ActiveBundle bundle = processor.newBundle(outputReceivers, StateRequestHandler.unsupported(), BundleProgressHandler.ignored());
            Throwable throwable = null;
            try {
                ((FnDataReceiver)bundle.getInputReceivers().get(stage.getInputPCollection().getId())).accept((Object)WindowedValue.valueInGlobalWindow((Object)CoderUtils.encodeToByteArray((Coder)StringUtf8Coder.of(), (Object)"X")));
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (bundle == null) continue;
                RemoteExecutionTest.$closeResource(throwable, (AutoCloseable)bundle);
            }
        }
        Assert.assertThat(outputValues, (Matcher)Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"stream1X", (Object)"")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"stream2X", (Object)""))}));
    }

    private KV<String, byte[]> byteValueOf(String key, long value) throws CoderException {
        return KV.of((Object)key, (Object)CoderUtils.encodeToByteArray((Coder)BigEndianLongCoder.of(), (Object)value));
    }

    private KV<String, Timer<byte[]>> timerBytes(String key, long timestampOffset) throws CoderException {
        return KV.of((Object)key, (Object)Timer.of((Instant)BoundedWindow.TIMESTAMP_MIN_VALUE.plus(timestampOffset), (Object)CoderUtils.encodeToByteArray((Coder)VoidCoder.of(), null, (Coder.Context)Coder.Context.NESTED)));
    }

    private Object timerStructuralValue(Object timer) {
        return WindowedValue.FullWindowedValueCoder.of((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)Timer.Coder.of((Coder)ByteArrayCoder.of())), (Coder)GlobalWindow.Coder.INSTANCE).structuralValue(timer);
    }

    private Collection<Object> timerStructuralValues(Collection<?> timers) {
        return Collections2.transform(timers, this::timerStructuralValue);
    }
}

