package org.apache.beam.runners.fnexecution.control;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.FusedPipeline;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
import org.apache.beam.runners.core.construction.graph.SideInputReference;
import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoMatchers;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.logging.LogWriter;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.server.GrpcContextHeaderAccessorProvider;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.InProcessServerFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsEmptyIterable;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.joda.time.DateTimeUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.class */
public class RemoteExecutionTest implements Serializable {

    @Rule
    public transient ResetDateTimeProvider resetDateTimeProvider = new ResetDateTimeProvider();
    private static final String WORKER_ID = "remote_test";
    private transient GrpcFnServer<FnApiControlClientPoolService> controlServer;
    private transient GrpcFnServer<GrpcDataService> dataServer;
    private transient GrpcFnServer<GrpcStateService> stateServer;
    private transient LogCapturer logCapturer;
    private transient GrpcFnServer<GrpcLoggingService> loggingServer;
    private transient GrpcStateService stateDelegator;
    private transient SdkHarnessClient controlClient;
    private transient ExecutorService serverExecutor;
    private transient ExecutorService sdkHarnessExecutor;
    private transient Future<?> sdkHarnessExecutorFuture;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/RemoteExecutionTest$LogCapturer.class */
    public static class LogCapturer implements LogWriter {
        List<BeamFnApi.LogEntry> capturedLogs;

        private LogCapturer() {
            this.capturedLogs = Collections.synchronizedList(new ArrayList());
        }

        public void log(BeamFnApi.LogEntry logEntry) {
            this.capturedLogs.add(logEntry);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/RemoteExecutionTest$MetricsDoFn.class */
    public static class MetricsDoFn extends DoFn<byte[], String> {
        private static final String PROCESS_USER_COUNTER_NAME = "processUserCounter";
        private static final String START_USER_COUNTER_NAME = "startUserCounter";
        private static final String FINISH_USER_COUNTER_NAME = "finishUserCounter";
        private static final String PROCESS_USER_DISTRIBUTION_NAME = "processUserDistribution";
        private static final String START_USER_DISTRIBUTION_NAME = "startUserDistribution";
        private static final String FINISH_USER_DISTRIBUTION_NAME = "finishUserDistribution";
        private static final ConcurrentMap<String, CountDownLatch> AFTER_PROCESS = new ConcurrentHashMap();
        private static final ConcurrentMap<String, CountDownLatch> ALLOW_COMPLETION = new ConcurrentHashMap();
        private final String uuid = UUID.randomUUID().toString();

        public MetricsDoFn() {
            AFTER_PROCESS.put(this.uuid, new CountDownLatch(1));
            ALLOW_COMPLETION.put(this.uuid, new CountDownLatch(1));
        }

        @DoFn.StartBundle
        public void startBundle() throws InterruptedException {
            Metrics.counter(RemoteExecutionTest.class, START_USER_COUNTER_NAME).inc(10L);
            Metrics.distribution(RemoteExecutionTest.class, START_USER_DISTRIBUTION_NAME).update(10L);
            Thread.sleep(500L);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<byte[], String>.ProcessContext processContext) throws InterruptedException {
            processContext.output("zero");
            processContext.output("one");
            processContext.output("two");
            Metrics.counter(RemoteExecutionTest.class, PROCESS_USER_COUNTER_NAME).inc();
            Metrics.distribution(RemoteExecutionTest.class, PROCESS_USER_DISTRIBUTION_NAME).update(1L);
            Thread.sleep(500L);
            AFTER_PROCESS.get(this.uuid).countDown();
            Preconditions.checkState(ALLOW_COMPLETION.get(this.uuid).await(60L, TimeUnit.SECONDS), "Failed to wait for DoFn to be allowed to complete.");
        }

        @DoFn.FinishBundle
        public void finishBundle() throws InterruptedException {
            Metrics.counter(RemoteExecutionTest.class, FINISH_USER_COUNTER_NAME).inc(100L);
            Metrics.distribution(RemoteExecutionTest.class, FINISH_USER_DISTRIBUTION_NAME).update(100L);
            Thread.sleep(500L);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/RemoteExecutionTest$StoringStateRequestHandler.class */
    private static class StoringStateRequestHandler implements StateRequestHandler {
        private StateRequestHandler stateRequestHandler;
        private ArrayList<BeamFnApi.StateRequest> receivedRequests = new ArrayList<>();
        private ArrayList<BeamFnApi.ProcessBundleRequest.CacheToken> cacheTokens = new ArrayList<>();

        StoringStateRequestHandler(StateRequestHandler stateRequestHandler) {
            this.stateRequestHandler = stateRequestHandler;
        }

        public CompletionStage<BeamFnApi.StateResponse.Builder> handle(BeamFnApi.StateRequest stateRequest) throws Exception {
            this.receivedRequests.add(stateRequest);
            return this.stateRequestHandler.handle(stateRequest);
        }

        public Iterable<BeamFnApi.ProcessBundleRequest.CacheToken> getCacheTokens() {
            return Iterables.concat(this.stateRequestHandler.getCacheTokens(), this.cacheTokens);
        }

        public int getRequestCount() {
            return this.receivedRequests.size();
        }

        public void addCacheToken(BeamFnApi.ProcessBundleRequest.CacheToken cacheToken) {
            this.cacheTokens.add(cacheToken);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/RemoteExecutionTest$WaitingTillSplitRestrictionTracker.class */
    private static class WaitingTillSplitRestrictionTracker extends RestrictionTracker<String, Void> {
        private static final String WAIT_TILL_SPLIT = "WaitTillSplit";
        private static final String PRIMARY = "Primary";
        private static final String RESIDUAL = "Residual";
        private String currentRestriction;

        private WaitingTillSplitRestrictionTracker(String str) {
            this.currentRestriction = str;
        }

        public boolean tryClaim(Void r3) {
            return needsSplitting();
        }

        /* renamed from: currentRestriction, reason: merged with bridge method [inline-methods] */
        public String m2currentRestriction() {
            return this.currentRestriction;
        }

        public SplitResult<String> trySplit(double d) {
            if (!needsSplitting()) {
                return null;
            }
            this.currentRestriction = PRIMARY;
            return SplitResult.of(this.currentRestriction, RESIDUAL);
        }

        private boolean needsSplitting() {
            return WAIT_TILL_SPLIT.equals(this.currentRestriction);
        }

        public void checkDone() throws IllegalStateException {
            Preconditions.checkState(!needsSplitting(), "Expected for this restriction to have been split.");
        }

        public RestrictionTracker.IsBounded isBounded() {
            return RestrictionTracker.IsBounded.BOUNDED;
        }
    }

    public void launchSdkHarness(PipelineOptions pipelineOptions) throws Exception {
        ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true).build();
        this.serverExecutor = Executors.newCachedThreadPool(build);
        InProcessServerFactory create = InProcessServerFactory.create();
        this.dataServer = GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(PipelineOptionsFactory.create(), this.serverExecutor, OutboundObserverFactory.serverDirect()), create);
        this.logCapturer = new LogCapturer();
        this.loggingServer = GrpcFnServer.allocatePortAndCreateFor(GrpcLoggingService.forWriter(this.logCapturer), create);
        this.stateDelegator = GrpcStateService.create();
        this.stateServer = GrpcFnServer.allocatePortAndCreateFor(this.stateDelegator, create);
        MapControlClientPool create2 = MapControlClientPool.create();
        this.controlServer = GrpcFnServer.allocatePortAndCreateFor(FnApiControlClientPoolService.offeringClientsToPool(create2.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()), create);
        this.sdkHarnessExecutor = Executors.newSingleThreadExecutor(build);
        this.sdkHarnessExecutorFuture = this.sdkHarnessExecutor.submit(() -> {
            try {
                FnHarness.main(WORKER_ID, pipelineOptions, Collections.emptySet(), this.loggingServer.getApiServiceDescriptor(), this.controlServer.getApiServiceDescriptor(), (Endpoints.ApiServiceDescriptor) null, ManagedChannelFactory.createInProcess(), OutboundObserverFactory.clientDirect(), Caches.eternal());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        this.controlClient = SdkHarnessClient.usingFnApiClient(create2.getSource().take(WORKER_ID, Duration.ofSeconds(2L)), this.dataServer.getService());
    }

    @After
    public void tearDown() throws Exception {
        this.controlServer.close();
        this.stateServer.close();
        this.dataServer.close();
        this.loggingServer.close();
        this.controlClient.close();
        this.sdkHarnessExecutor.shutdownNow();
        this.serverExecutor.shutdownNow();
        try {
            this.sdkHarnessExecutorFuture.get();
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof RuntimeException) || !(e.getCause().getCause() instanceof InterruptedException)) {
                throw e;
            }
        }
        this.logCapturer = null;
    }

    @Test
    public void testExecution() throws Exception {
        launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline create = Pipeline.create();
        create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], String>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.1
            @DoFn.ProcessElement
            public void process(DoFn<byte[], String>.ProcessContext processContext) {
                processContext.output("zero");
                processContext.output("one");
                processContext.output("two");
            }
        })).apply("len", ParDo.of(new DoFn<String, Long>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.2
            @DoFn.ProcessElement
            public void process(DoFn<String, Long>.ProcessContext processContext) {
                processContext.output(Long.valueOf(((String) processContext.element()).length()));
            }
        })).apply("addKeys", WithKeys.of("foo")).setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of())).apply("gbk", GroupByKey.create());
        FusedPipeline fuse = GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create));
        Preconditions.checkState(fuse.getFusedStages().size() == 1, "Expected exactly one fused stage");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("my_stage", (ExecutableStage) fuse.getFusedStages().iterator().next(), this.dataServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations());
        Map remoteOutputCoders = fromExecutableStage.getRemoteOutputCoders();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : remoteOutputCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            hashMap.put((String) entry.getKey(), synchronizedList);
            String str = (String) entry.getKey();
            Coder coder = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap2.put(str, RemoteOutputReceiver.of(coder, (v1) -> {
                r3.add(v1);
            }));
        }
        SdkHarnessClient.BundleProcessor.ActiveBundle newBundle = processor.newBundle(hashMap2, BundleProgressHandler.ignored());
        Throwable th = null;
        try {
            try {
                ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(new byte[0]));
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                Iterator it = hashMap.values().iterator();
                while (it.hasNext()) {
                    MatcherAssert.assertThat((Collection) it.next(), Matchers.containsInAnyOrder(new Object[]{WindowedValue.valueInGlobalWindow(byteValueOf("foo", 4L)), WindowedValue.valueInGlobalWindow(byteValueOf("foo", 3L)), WindowedValue.valueInGlobalWindow(byteValueOf("foo", 3L))}));
                }
            } finally {
            }
        } catch (Throwable th2) {
            if (newBundle != null) {
                $closeResource(th, newBundle);
            }
            throw th2;
        }
    }

    @Test
    public void testLogging() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline create = Pipeline.create();
        create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], String>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.3
            @DoFn.ProcessElement
            public void process(DoFn<byte[], String>.ProcessContext processContext) {
                processContext.output("zero");
            }
        })).apply("len", ParDo.of(new DoFn<String, Long>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.4
            @DoFn.ProcessElement
            public void process(DoFn<String, Long>.ProcessContext processContext) {
                Logger logger = LoggerFactory.getLogger(RemoteExecutionTest.class);
                logger.warn("TEST" + ((String) processContext.element()));
                logger.error("TEST_EXCEPTION" + ((String) processContext.element()), new Exception());
            }
        })).apply("addKeys", WithKeys.of("foo")).setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of())).apply("gbk", GroupByKey.create());
        FusedPipeline fuse = GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create));
        Preconditions.checkState(fuse.getFusedStages().size() == 1, "Expected exactly one fused stage");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("my_stage", (ExecutableStage) fuse.getFusedStages().iterator().next(), this.dataServer.getApiServiceDescriptor());
        String str = null;
        for (Map.Entry entry : fromExecutableStage.getProcessBundleDescriptor().getTransformsMap().entrySet()) {
            if (((RunnerApi.PTransform) entry.getValue()).getUniqueName().contains("len")) {
                str = (String) entry.getKey();
            }
        }
        Assert.assertNotNull(str);
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations());
        Map remoteOutputCoders = fromExecutableStage.getRemoteOutputCoders();
        HashMap hashMap = new HashMap();
        for (Map.Entry entry2 : remoteOutputCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            String str2 = (String) entry2.getKey();
            Coder coder = (Coder) entry2.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap.put(str2, RemoteOutputReceiver.of(coder, (v1) -> {
                r3.add(v1);
            }));
        }
        SdkHarnessClient.BundleProcessor.ActiveBundle newBundle = processor.newBundle(hashMap, BundleProgressHandler.ignored());
        try {
            String id = newBundle.getId();
            ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(new byte[0]));
            if (newBundle != null) {
                $closeResource(null, newBundle);
            }
            while (System.currentTimeMillis() - currentTimeMillis < 30000) {
                boolean z = false;
                boolean z2 = false;
                for (BeamFnApi.LogEntry logEntry : (BeamFnApi.LogEntry[]) this.logCapturer.capturedLogs.toArray(new BeamFnApi.LogEntry[0])) {
                    MatcherAssert.assertThat(Long.valueOf((logEntry.getTimestamp().getSeconds() * 1000) + (logEntry.getTimestamp().getNanos() / 1000000)), Matchers.allOf(Matchers.greaterThanOrEqualTo(Long.valueOf(currentTimeMillis)), Matchers.lessThanOrEqualTo(Long.valueOf(System.currentTimeMillis()))));
                    MatcherAssert.assertThat(logEntry.getThread(), Matchers.not(""));
                    MatcherAssert.assertThat(logEntry.getLogLocation(), Matchers.not(""));
                    if ("TESTzero".equals(logEntry.getMessage())) {
                        MatcherAssert.assertThat(logEntry.getSeverity(), Matchers.equalTo(BeamFnApi.LogEntry.Severity.Enum.WARN));
                        MatcherAssert.assertThat(logEntry.getInstructionId(), Matchers.equalTo(id));
                        MatcherAssert.assertThat(logEntry.getLogLocation(), Matchers.equalTo(RemoteExecutionTest.class.getCanonicalName()));
                        MatcherAssert.assertThat(logEntry.getTransformId(), Matchers.equalTo(str));
                        MatcherAssert.assertThat(logEntry.getTrace(), Matchers.equalTo(""));
                        z = true;
                    } else if ("TEST_EXCEPTIONzero".equals(logEntry.getMessage())) {
                        MatcherAssert.assertThat(logEntry.getSeverity(), Matchers.equalTo(BeamFnApi.LogEntry.Severity.Enum.ERROR));
                        MatcherAssert.assertThat(logEntry.getInstructionId(), Matchers.equalTo(id));
                        MatcherAssert.assertThat(logEntry.getLogLocation(), Matchers.equalTo(RemoteExecutionTest.class.getCanonicalName()));
                        MatcherAssert.assertThat(logEntry.getTransformId(), Matchers.equalTo(str));
                        MatcherAssert.assertThat(logEntry.getTrace(), Matchers.containsString("RemoteExecutionTest"));
                        z2 = true;
                    }
                }
                if (z && z2) {
                    return;
                } else {
                    Thread.sleep(500L);
                }
            }
        } catch (Throwable th) {
            if (newBundle != null) {
                $closeResource(null, newBundle);
            }
            throw th;
        }
    }

    @Test
    public void testBundleProcessorThrowsExecutionExceptionWhenUserCodeThrows() throws Exception {
        launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline create = Pipeline.create();
        create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.5
            @DoFn.ProcessElement
            public void process(DoFn<byte[], KV<String, String>>.ProcessContext processContext) throws Exception {
                String str = (String) CoderUtils.decodeFromByteArray(StringUtf8Coder.of(), (byte[]) processContext.element());
                if (str.equals("X")) {
                    throw new Exception("testBundleExecutionFailure");
                }
                processContext.output(KV.of(str, str));
            }
        })).apply("gbk", GroupByKey.create());
        FusedPipeline fuse = GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create));
        Preconditions.checkState(fuse.getFusedStages().size() == 1, "Expected exactly one fused stage");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("my_stage", (ExecutableStage) fuse.getFusedStages().iterator().next(), this.dataServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations());
        Map remoteOutputCoders = fromExecutableStage.getRemoteOutputCoders();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : remoteOutputCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            hashMap.put((String) entry.getKey(), synchronizedList);
            String str = (String) entry.getKey();
            Coder coder = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap2.put(str, RemoteOutputReceiver.of(coder, (v1) -> {
                r3.add(v1);
            }));
        }
        SdkHarnessClient.BundleProcessor.ActiveBundle newBundle = processor.newBundle(hashMap2, BundleProgressHandler.ignored());
        Throwable th = null;
        try {
            try {
                ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y")));
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                try {
                    SdkHarnessClient.BundleProcessor.ActiveBundle newBundle2 = processor.newBundle(hashMap2, BundleProgressHandler.ignored());
                    Throwable th2 = null;
                    try {
                        try {
                            ((FnDataReceiver) Iterables.getOnlyElement(newBundle2.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
                            if (newBundle2 != null) {
                                $closeResource(null, newBundle2);
                            }
                            Assert.fail();
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (newBundle2 != null) {
                            $closeResource(th2, newBundle2);
                        }
                        throw th3;
                    }
                } catch (ExecutionException e) {
                    Assert.assertTrue(e.getMessage().contains("testBundleExecutionFailure"));
                }
                newBundle = processor.newBundle(hashMap2, BundleProgressHandler.ignored());
                Throwable th4 = null;
                try {
                    try {
                        ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Z")));
                        if (newBundle != null) {
                            $closeResource(null, newBundle);
                        }
                        Iterator it = hashMap.values().iterator();
                        while (it.hasNext()) {
                            MatcherAssert.assertThat((Collection) it.next(), Matchers.containsInAnyOrder(new Object[]{WindowedValue.valueInGlobalWindow(KV.of("Y", "Y")), WindowedValue.valueInGlobalWindow(KV.of("Z", "Z"))}));
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testExecutionWithSideInput() throws Exception {
        launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline create = Pipeline.create();
        ExperimentalOptions.addExperiment(create.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
        ExperimentalOptions.addExperiment(create.getOptions().as(ExperimentalOptions.class), "use_runner_v2");
        PCollection coder = create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], String>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.6
            @DoFn.ProcessElement
            public void process(DoFn<byte[], String>.ProcessContext processContext) {
                processContext.output("zero");
                processContext.output("one");
                processContext.output("two");
            }
        })).setCoder(StringUtf8Coder.of());
        final PCollectionView apply = coder.apply("createIterableSideInput", View.asIterable());
        final PCollectionView apply2 = coder.apply(WithKeys.of("key")).apply("createMultimapSideInput", View.asMultimap());
        coder.apply("readSideInput", ParDo.of(new DoFn<String, KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.7
            @DoFn.ProcessElement
            public void processElement(DoFn<String, KV<String, String>>.ProcessContext processContext) {
                Iterator it = ((Iterable) processContext.sideInput(apply)).iterator();
                while (it.hasNext()) {
                    processContext.output(KV.of((String) processContext.element(), (String) it.next()));
                }
                for (Map.Entry entry : ((Map) processContext.sideInput(apply2)).entrySet()) {
                    Iterator it2 = ((Iterable) entry.getValue()).iterator();
                    while (it2.hasNext()) {
                        processContext.output(KV.of((String) processContext.element(), ((String) entry.getKey()) + ":" + ((String) it2.next())));
                    }
                }
            }
        }).withSideInputs(new PCollectionView[]{apply, apply2})).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply("gbk", GroupByKey.create());
        Optional tryFind = Iterables.tryFind(GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages(), executableStage -> {
            return !executableStage.getSideInputs().isEmpty();
        });
        Preconditions.checkState(tryFind.isPresent(), "Expected a stage with side inputs.");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("test_stage", (ExecutableStage) tryFind.get(), this.dataServer.getApiServiceDescriptor(), this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations(), this.stateDelegator);
        Map remoteOutputCoders = fromExecutableStage.getRemoteOutputCoders();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : remoteOutputCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            hashMap.put((String) entry.getKey(), synchronizedList);
            String str = (String) entry.getKey();
            Coder coder2 = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap2.put(str, RemoteOutputReceiver.of(coder2, (v1) -> {
                r3.add(v1);
            }));
        }
        SdkHarnessClient.BundleProcessor.ActiveBundle newBundle = processor.newBundle(hashMap2, StateRequestHandlers.forSideInputHandlerFactory(fromExecutableStage.getSideInputSpecs(), new StateRequestHandlers.SideInputHandlerFactory() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.8
            public <V, W extends BoundedWindow> StateRequestHandlers.IterableSideInputHandler<V, W> forIterableSideInput(String str2, String str3, final Coder<V> coder3, Coder<W> coder4) {
                return (StateRequestHandlers.IterableSideInputHandler<V, W>) new StateRequestHandlers.IterableSideInputHandler<V, W>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.8.1
                    /* JADX WARN: Incorrect types in method signature: (TW;)Ljava/lang/Iterable<TV;>; */
                    public Iterable get(BoundedWindow boundedWindow) {
                        return Arrays.asList("A", "B", "C");
                    }

                    public Coder<V> elementCoder() {
                        return coder3;
                    }
                };
            }

            public <K, V, W extends BoundedWindow> StateRequestHandlers.MultimapSideInputHandler<K, V, W> forMultimapSideInput(String str2, String str3, final KvCoder<K, V> kvCoder, Coder<W> coder3) {
                return (StateRequestHandlers.MultimapSideInputHandler<K, V, W>) new StateRequestHandlers.MultimapSideInputHandler<K, V, W>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.8.2
                    /* JADX WARN: Incorrect types in method signature: (TW;)Ljava/lang/Iterable<TK;>; */
                    public Iterable get(BoundedWindow boundedWindow) {
                        return Arrays.asList("key1", "key2");
                    }

                    /* JADX WARN: Incorrect types in method signature: (TK;TW;)Ljava/lang/Iterable<TV;>; */
                    public Iterable get(Object obj, BoundedWindow boundedWindow) {
                        return "key1".equals(obj) ? Arrays.asList("H", "I", "J") : "key2".equals(obj) ? Arrays.asList("M", "N", "O") : Collections.emptyList();
                    }

                    public Coder<K> keyCoder() {
                        return kvCoder.getKeyCoder();
                    }

                    public Coder<V> valueCoder() {
                        return kvCoder.getValueCoder();
                    }
                };
            }
        }), BundleProgressHandler.ignored());
        Throwable th = null;
        try {
            try {
                ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow("X"));
                ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow("Y"));
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                Iterator it = hashMap.values().iterator();
                while (it.hasNext()) {
                    MatcherAssert.assertThat((Collection) it.next(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(KV.of("X", "A")), WindowedValue.valueInGlobalWindow(KV.of("X", "B")), WindowedValue.valueInGlobalWindow(KV.of("X", "C")), WindowedValue.valueInGlobalWindow(KV.of("X", "key1:H")), WindowedValue.valueInGlobalWindow(KV.of("X", "key1:I")), WindowedValue.valueInGlobalWindow(KV.of("X", "key1:J")), WindowedValue.valueInGlobalWindow(KV.of("X", "key2:M")), WindowedValue.valueInGlobalWindow(KV.of("X", "key2:N")), WindowedValue.valueInGlobalWindow(KV.of("X", "key2:O")), WindowedValue.valueInGlobalWindow(KV.of("Y", "A")), WindowedValue.valueInGlobalWindow(KV.of("Y", "B")), WindowedValue.valueInGlobalWindow(KV.of("Y", "C")), WindowedValue.valueInGlobalWindow(KV.of("Y", "key1:H")), WindowedValue.valueInGlobalWindow(KV.of("Y", "key1:I")), WindowedValue.valueInGlobalWindow(KV.of("Y", "key1:J")), WindowedValue.valueInGlobalWindow(KV.of("Y", "key2:M")), WindowedValue.valueInGlobalWindow(KV.of("Y", "key2:N")), WindowedValue.valueInGlobalWindow(KV.of("Y", "key2:O"))}));
                }
            } finally {
            }
        } catch (Throwable th2) {
            if (newBundle != null) {
                $closeResource(th, newBundle);
            }
            throw th2;
        }
    }

    @Test
    public void testExecutionWithSideInputCaching() throws Exception {
        Pipeline create = Pipeline.create();
        ExperimentalOptions.addExperiment(create.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
        ExperimentalOptions.addExperiment(create.getOptions().as(ExperimentalOptions.class), "use_runner_v2");
        launchSdkHarness(create.getOptions());
        PCollection coder = create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], String>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.9
            @DoFn.ProcessElement
            public void process(DoFn<byte[], String>.ProcessContext processContext) {
                processContext.output("zero");
                processContext.output("one");
                processContext.output("two");
            }
        })).setCoder(StringUtf8Coder.of());
        final PCollectionView apply = coder.apply("createIterableSideInput", View.asIterable());
        final PCollectionView apply2 = coder.apply(WithKeys.of("key")).apply("createMultimapSideInput", View.asMultimap());
        coder.apply("readSideInput", ParDo.of(new DoFn<String, KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.10
            @DoFn.ProcessElement
            public void processElement(DoFn<String, KV<String, String>>.ProcessContext processContext) {
                Iterator it = ((Iterable) processContext.sideInput(apply)).iterator();
                while (it.hasNext()) {
                    processContext.output(KV.of((String) processContext.element(), (String) it.next()));
                }
                for (Map.Entry entry : ((Map) processContext.sideInput(apply2)).entrySet()) {
                    Iterator it2 = ((Iterable) entry.getValue()).iterator();
                    while (it2.hasNext()) {
                        processContext.output(KV.of((String) processContext.element(), ((String) entry.getKey()) + ":" + ((String) it2.next())));
                    }
                }
            }
        }).withSideInputs(new PCollectionView[]{apply, apply2})).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply("gbk", GroupByKey.create());
        Optional tryFind = Iterables.tryFind(GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages(), executableStage -> {
            return !executableStage.getSideInputs().isEmpty();
        });
        Preconditions.checkState(tryFind.isPresent(), "Expected a stage with side inputs.");
        ExecutableStage executableStage2 = (ExecutableStage) tryFind.get();
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("test_stage", executableStage2, this.dataServer.getApiServiceDescriptor(), this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations(), this.stateDelegator);
        Map remoteOutputCoders = fromExecutableStage.getRemoteOutputCoders();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : remoteOutputCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            hashMap.put((String) entry.getKey(), synchronizedList);
            String str = (String) entry.getKey();
            Coder coder2 = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap2.put(str, RemoteOutputReceiver.of(coder2, (v1) -> {
                r3.add(v1);
            }));
        }
        StoringStateRequestHandler storingStateRequestHandler = new StoringStateRequestHandler(StateRequestHandlers.forSideInputHandlerFactory(fromExecutableStage.getSideInputSpecs(), new StateRequestHandlers.SideInputHandlerFactory() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.11
            public <V, W extends BoundedWindow> StateRequestHandlers.IterableSideInputHandler<V, W> forIterableSideInput(String str2, String str3, final Coder<V> coder3, Coder<W> coder4) {
                return (StateRequestHandlers.IterableSideInputHandler<V, W>) new StateRequestHandlers.IterableSideInputHandler<V, W>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.11.1
                    /* JADX WARN: Incorrect types in method signature: (TW;)Ljava/lang/Iterable<TV;>; */
                    public Iterable get(BoundedWindow boundedWindow) {
                        return Arrays.asList("A", "B", "C");
                    }

                    public Coder<V> elementCoder() {
                        return coder3;
                    }
                };
            }

            public <K, V, W extends BoundedWindow> StateRequestHandlers.MultimapSideInputHandler<K, V, W> forMultimapSideInput(String str2, String str3, final KvCoder<K, V> kvCoder, Coder<W> coder3) {
                return (StateRequestHandlers.MultimapSideInputHandler<K, V, W>) new StateRequestHandlers.MultimapSideInputHandler<K, V, W>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.11.2
                    /* JADX WARN: Incorrect types in method signature: (TW;)Ljava/lang/Iterable<TK;>; */
                    public Iterable get(BoundedWindow boundedWindow) {
                        return Arrays.asList("key1", "key2");
                    }

                    /* JADX WARN: Incorrect types in method signature: (TK;TW;)Ljava/lang/Iterable<TV;>; */
                    public Iterable get(Object obj, BoundedWindow boundedWindow) {
                        return "key1".equals(obj) ? Arrays.asList("H", "I", "J") : "key2".equals(obj) ? Arrays.asList("M", "N", "O") : Collections.emptyList();
                    }

                    public Coder<K> keyCoder() {
                        return kvCoder.getKeyCoder();
                    }

                    public Coder<V> valueCoder() {
                        return kvCoder.getValueCoder();
                    }
                };
            }
        }));
        String id = ((SideInputReference) Iterables.get(executableStage2.getSideInputs(), 0)).transform().getId();
        storingStateRequestHandler.addCacheToken(BeamFnApi.ProcessBundleRequest.CacheToken.newBuilder().setSideInput(BeamFnApi.ProcessBundleRequest.CacheToken.SideInput.newBuilder().setSideInputId(apply.getTagInternal().getId()).setTransformId(id).build()).setToken(ByteString.copyFromUtf8("IterableSideInputToken")).build());
        storingStateRequestHandler.addCacheToken(BeamFnApi.ProcessBundleRequest.CacheToken.newBuilder().setSideInput(BeamFnApi.ProcessBundleRequest.CacheToken.SideInput.newBuilder().setSideInputId(apply2.getTagInternal().getId()).setTransformId(id).build()).setToken(ByteString.copyFromUtf8("MulitmapSideInputToken")).build());
        BundleProgressHandler ignored = BundleProgressHandler.ignored();
        SdkHarnessClient.BundleProcessor.ActiveBundle newBundle = processor.newBundle(hashMap2, storingStateRequestHandler, ignored);
        Throwable th = null;
        try {
            try {
                ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow("X"));
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                newBundle = processor.newBundle(hashMap2, storingStateRequestHandler, ignored);
                Throwable th2 = null;
            } finally {
            }
            try {
                try {
                    ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow("Y"));
                    if (newBundle != null) {
                        $closeResource(null, newBundle);
                    }
                    Iterator it = hashMap.values().iterator();
                    while (it.hasNext()) {
                        MatcherAssert.assertThat((Collection) it.next(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(KV.of("X", "A")), WindowedValue.valueInGlobalWindow(KV.of("X", "B")), WindowedValue.valueInGlobalWindow(KV.of("X", "C")), WindowedValue.valueInGlobalWindow(KV.of("X", "key1:H")), WindowedValue.valueInGlobalWindow(KV.of("X", "key1:I")), WindowedValue.valueInGlobalWindow(KV.of("X", "key1:J")), WindowedValue.valueInGlobalWindow(KV.of("X", "key2:M")), WindowedValue.valueInGlobalWindow(KV.of("X", "key2:N")), WindowedValue.valueInGlobalWindow(KV.of("X", "key2:O")), WindowedValue.valueInGlobalWindow(KV.of("Y", "A")), WindowedValue.valueInGlobalWindow(KV.of("Y", "B")), WindowedValue.valueInGlobalWindow(KV.of("Y", "C")), WindowedValue.valueInGlobalWindow(KV.of("Y", "key1:H")), WindowedValue.valueInGlobalWindow(KV.of("Y", "key1:I")), WindowedValue.valueInGlobalWindow(KV.of("Y", "key1:J")), WindowedValue.valueInGlobalWindow(KV.of("Y", "key2:M")), WindowedValue.valueInGlobalWindow(KV.of("Y", "key2:N")), WindowedValue.valueInGlobalWindow(KV.of("Y", "key2:O"))}));
                    }
                    Assert.assertEquals(4L, storingStateRequestHandler.receivedRequests.size());
                    Assert.assertEquals(((BeamFnApi.StateRequest) storingStateRequestHandler.receivedRequests.get(0)).getStateKey().getIterableSideInput(), BeamFnApi.StateKey.IterableSideInput.newBuilder().setSideInputId(apply.getTagInternal().getId()).setTransformId(id).build());
                    Assert.assertEquals(((BeamFnApi.StateRequest) storingStateRequestHandler.receivedRequests.get(1)).getStateKey().getMultimapKeysSideInput(), BeamFnApi.StateKey.MultimapKeysSideInput.newBuilder().setSideInputId(apply2.getTagInternal().getId()).setTransformId(id).build());
                    Assert.assertEquals(((BeamFnApi.StateRequest) storingStateRequestHandler.receivedRequests.get(2)).getStateKey().getMultimapSideInput(), BeamFnApi.StateKey.MultimapSideInput.newBuilder().setSideInputId(apply2.getTagInternal().getId()).setTransformId(id).setKey(encode("key1")).build());
                    Assert.assertEquals(((BeamFnApi.StateRequest) storingStateRequestHandler.receivedRequests.get(3)).getStateKey().getMultimapSideInput(), BeamFnApi.StateKey.MultimapSideInput.newBuilder().setSideInputId(apply2.getTagInternal().getId()).setTransformId(id).setKey(encode("key2")).build());
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    private static ByteString encode(String str) throws Exception {
        ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
        StringUtf8Coder.of().encode(str, byteStringOutputStream);
        return byteStringOutputStream.toByteString();
    }

    @Test
    public void testMetrics() throws Exception {
        launchSdkHarness(PipelineOptionsFactory.fromArgs(new String[]{"--experiments=state_sampling_period_millis=10"}).create());
        final MetricsDoFn metricsDoFn = new MetricsDoFn();
        Pipeline create = Pipeline.create();
        PCollection coder = create.apply("impulse", Impulse.create()).apply("create", ParDo.of(metricsDoFn)).setCoder(StringUtf8Coder.of());
        ParDo.SingleOutput of = ParDo.of(new DoFn<String, String>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.12
            @DoFn.ProcessElement
            public void process(DoFn<String, String>.ProcessContext processContext) {
                processContext.output((String) processContext.element());
                processContext.output((String) processContext.element());
            }
        });
        coder.apply("processA", of).setCoder(StringUtf8Coder.of());
        coder.apply("processB", of).setCoder(StringUtf8Coder.of());
        Optional tryFind = Iterables.tryFind(GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages(), executableStage -> {
            return true;
        });
        Preconditions.checkState(tryFind.isPresent(), "Expected a stage with side inputs.");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("test_stage", (ExecutableStage) tryFind.get(), this.dataServer.getApiServiceDescriptor(), this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations(), this.stateDelegator);
        Map remoteOutputCoders = fromExecutableStage.getRemoteOutputCoders();
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : remoteOutputCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            String str = (String) entry.getKey();
            Coder coder2 = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap.put(str, RemoteOutputReceiver.of(coder2, (v1) -> {
                r3.add(v1);
            }));
        }
        final AtomicReference atomicReference = new AtomicReference();
        BundleProgressHandler bundleProgressHandler = new BundleProgressHandler() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.13
            public void onProgress(BeamFnApi.ProcessBundleProgressResponse processBundleProgressResponse) {
                atomicReference.set(processBundleProgressResponse.getMonitoringInfosList());
                ((CountDownLatch) MetricsDoFn.ALLOW_COMPLETION.get(metricsDoFn.uuid)).countDown();
                ArrayList arrayList = new ArrayList();
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "processUserCounter");
                simpleMonitoringInfoBuilder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                simpleMonitoringInfoBuilder.setInt64SumValue(1L);
                arrayList.add(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder.build()));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder2 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder2.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "startUserCounter");
                simpleMonitoringInfoBuilder2.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                simpleMonitoringInfoBuilder2.setInt64SumValue(10L);
                arrayList.add(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder2.build()));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder3 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder3.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "finishUserCounter");
                simpleMonitoringInfoBuilder3.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                arrayList.add(Matchers.not(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder3.build())));
                simpleMonitoringInfoBuilder3.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "processUserDistribution");
                simpleMonitoringInfoBuilder3.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                simpleMonitoringInfoBuilder3.setInt64DistributionValue(DistributionData.create(1L, 1L, 1L, 1L));
                arrayList.add(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder3.build()));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder4 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder4.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "startUserDistribution");
                simpleMonitoringInfoBuilder4.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                simpleMonitoringInfoBuilder4.setInt64DistributionValue(DistributionData.create(10L, 1L, 10L, 10L));
                arrayList.add(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder4.build()));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder5 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder5.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "finishUserDistribution");
                simpleMonitoringInfoBuilder5.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                arrayList.add(Matchers.not(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder5.build())));
                MatcherAssert.assertThat(processBundleProgressResponse.getMonitoringInfosList(), Matchers.hasItems((Matcher[]) arrayList.toArray(new Matcher[0])));
            }

            public void onCompleted(BeamFnApi.ProcessBundleResponse processBundleResponse) {
                ArrayList arrayList = new ArrayList();
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "processUserCounter");
                simpleMonitoringInfoBuilder.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                simpleMonitoringInfoBuilder.setInt64SumValue(1L);
                arrayList.add(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder.build()));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder2 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder2.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "startUserCounter");
                simpleMonitoringInfoBuilder2.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                simpleMonitoringInfoBuilder2.setInt64SumValue(10L);
                arrayList.add(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder2.build()));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder3 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder3.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "finishUserCounter");
                simpleMonitoringInfoBuilder3.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                simpleMonitoringInfoBuilder3.setInt64SumValue(100L);
                arrayList.add(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder3.build()));
                simpleMonitoringInfoBuilder3.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "processUserDistribution");
                simpleMonitoringInfoBuilder3.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                simpleMonitoringInfoBuilder3.setInt64DistributionValue(DistributionData.create(1L, 1L, 1L, 1L));
                arrayList.add(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder3.build()));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder4 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder4.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "startUserDistribution");
                simpleMonitoringInfoBuilder4.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                simpleMonitoringInfoBuilder4.setInt64DistributionValue(DistributionData.create(10L, 1L, 10L, 10L));
                arrayList.add(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder4.build()));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder5 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder5.setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64).setLabel("NAMESPACE", RemoteExecutionTest.class.getName()).setLabel("NAME", "finishUserDistribution");
                simpleMonitoringInfoBuilder5.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                simpleMonitoringInfoBuilder5.setInt64DistributionValue(DistributionData.create(100L, 1L, 100L, 100L));
                arrayList.add(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder5.build()));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder6 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder6.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
                simpleMonitoringInfoBuilder6.setLabel("PCOLLECTION", "impulse.out");
                simpleMonitoringInfoBuilder6.setInt64SumValue(1L);
                arrayList.add(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder6.build()));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder7 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder7.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
                simpleMonitoringInfoBuilder7.setLabel("PCOLLECTION", "create/ParMultiDo(Metrics).output");
                simpleMonitoringInfoBuilder7.setInt64SumValue(3L);
                arrayList.add(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder7.build()));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder8 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder8.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
                simpleMonitoringInfoBuilder8.setLabel("PCOLLECTION", "processA/ParMultiDo(Anonymous).output");
                simpleMonitoringInfoBuilder8.setInt64SumValue(6L);
                arrayList.add(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder8.build()));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder9 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder9.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
                simpleMonitoringInfoBuilder9.setLabel("PCOLLECTION", "processB/ParMultiDo(Anonymous).output");
                simpleMonitoringInfoBuilder9.setInt64SumValue(6L);
                arrayList.add(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder9.build()));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder10 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder10.setUrn("beam:metric:pardo_execution_time:start_bundle_msecs:v1");
                simpleMonitoringInfoBuilder10.setType("beam:metrics:sum_int64:v1");
                simpleMonitoringInfoBuilder10.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                arrayList.add(Matchers.allOf(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder10.build()), MonitoringInfoMatchers.counterValueGreaterThanOrEqualTo(1L)));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder11 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder11.setUrn("beam:metric:pardo_execution_time:process_bundle_msecs:v1");
                simpleMonitoringInfoBuilder11.setType("beam:metrics:sum_int64:v1");
                simpleMonitoringInfoBuilder11.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                arrayList.add(Matchers.allOf(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder11.build()), MonitoringInfoMatchers.counterValueGreaterThanOrEqualTo(1L)));
                SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder12 = new SimpleMonitoringInfoBuilder();
                simpleMonitoringInfoBuilder12.setUrn("beam:metric:pardo_execution_time:finish_bundle_msecs:v1");
                simpleMonitoringInfoBuilder12.setType("beam:metrics:sum_int64:v1");
                simpleMonitoringInfoBuilder12.setLabel("PTRANSFORM", "create-ParMultiDo-Metrics-");
                arrayList.add(Matchers.allOf(MonitoringInfoMatchers.matchSetFields(simpleMonitoringInfoBuilder12.build()), MonitoringInfoMatchers.counterValueGreaterThanOrEqualTo(1L)));
                List list = (List) atomicReference.get();
                if (list == null) {
                    throw new IllegalStateException("Progress request did not complete before timeout allowing for bundle to complete.");
                }
                MatcherAssert.assertThat(RemoteExecutionTest.mergeMonitoringInfos(list, processBundleResponse.getMonitoringInfosList()), Matchers.hasItems((Matcher[]) arrayList.toArray(new Matcher[0])));
            }
        };
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        SdkHarnessClient.BundleProcessor.ActiveBundle newBundle = processor.newBundle(hashMap, StateRequestHandler.unsupported(), bundleProgressHandler);
        Throwable th = null;
        try {
            try {
                ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
                newSingleThreadExecutor.submit(() -> {
                    Preconditions.checkState(((CountDownLatch) MetricsDoFn.AFTER_PROCESS.get(metricsDoFn.uuid)).await(60L, TimeUnit.SECONDS), "Runner waited too long for DoFn to get to AFTER_PROCESS.");
                    newBundle.requestProgress();
                    return (Void) null;
                });
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                newSingleThreadExecutor.shutdown();
            } finally {
            }
        } catch (Throwable th2) {
            if (newBundle != null) {
                $closeResource(th, newBundle);
            }
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<MetricsApi.MonitoringInfo> mergeMonitoringInfos(List<MetricsApi.MonitoringInfo> list, List<MetricsApi.MonitoringInfo> list2) {
        HashMap hashMap = new HashMap();
        for (MetricsApi.MonitoringInfo monitoringInfo : list) {
            hashMap.put(monitoringInfo.toBuilder().clearPayload().build(), monitoringInfo);
        }
        for (MetricsApi.MonitoringInfo monitoringInfo2 : list2) {
            hashMap.put(monitoringInfo2.toBuilder().clearPayload().build(), monitoringInfo2);
        }
        return new ArrayList(hashMap.values());
    }

    @Test
    public void testExecutionWithUserState() throws Exception {
        launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline create = Pipeline.create();
        create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.14
            @DoFn.ProcessElement
            public void process(DoFn<byte[], KV<String, String>>.ProcessContext processContext) {
            }
        })).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply("userState", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.15

            @DoFn.StateId("foo")
            private final StateSpec<BagState<String>> bufferState = StateSpecs.bag(StringUtf8Coder.of());

            @DoFn.StateId("foo2")
            private final StateSpec<BagState<String>> bufferState2 = StateSpecs.bag(StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element KV<String, String> kv, @DoFn.StateId("foo") BagState<String> bagState, @DoFn.StateId("foo2") BagState<String> bagState2, DoFn.OutputReceiver<KV<String, String>> outputReceiver) {
                Iterator it = bagState.read().iterator();
                while (it.hasNext()) {
                    outputReceiver.output(KV.of((String) kv.getKey(), (String) it.next()));
                }
                bagState.add((String) kv.getValue());
                bagState2.clear();
            }
        })).apply("gbk", GroupByKey.create());
        Optional tryFind = Iterables.tryFind(GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages(), executableStage -> {
            return !executableStage.getUserStates().isEmpty();
        });
        Preconditions.checkState(tryFind.isPresent(), "Expected a stage with user state.");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("test_stage", (ExecutableStage) tryFind.get(), this.dataServer.getApiServiceDescriptor(), this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations(), this.stateDelegator);
        Map remoteOutputCoders = fromExecutableStage.getRemoteOutputCoders();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : remoteOutputCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            hashMap.put((String) entry.getKey(), synchronizedList);
            String str = (String) entry.getKey();
            Coder coder = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap2.put(str, RemoteOutputReceiver.of(coder, (v1) -> {
                r3.add(v1);
            }));
        }
        final ImmutableMap of = ImmutableMap.of("foo", new ArrayList(Arrays.asList(ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A", Coder.Context.NESTED)), ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B", Coder.Context.NESTED)), ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C", Coder.Context.NESTED)))), "foo2", new ArrayList(Arrays.asList(ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "D", Coder.Context.NESTED)))));
        SdkHarnessClient.BundleProcessor.ActiveBundle newBundle = processor.newBundle(hashMap2, StateRequestHandlers.forBagUserStateHandlerFactory(fromExecutableStage, new StateRequestHandlers.BagUserStateHandlerFactory<ByteString, Object, BoundedWindow>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.16
            public StateRequestHandlers.BagUserStateHandler<ByteString, Object, BoundedWindow> forUserState(String str2, final String str3, Coder<ByteString> coder2, Coder<Object> coder3, Coder<BoundedWindow> coder4) {
                return new StateRequestHandlers.BagUserStateHandler<ByteString, Object, BoundedWindow>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.16.1
                    public Iterable<Object> get(ByteString byteString, BoundedWindow boundedWindow) {
                        return (Iterable) of.get(str3);
                    }

                    public void append(ByteString byteString, BoundedWindow boundedWindow, Iterator<Object> it) {
                        Iterators.addAll((Collection) of.get(str3), it);
                    }

                    public void clear(ByteString byteString, BoundedWindow boundedWindow) {
                        ((List) of.get(str3)).clear();
                    }

                    public /* bridge */ /* synthetic */ void append(Object obj, BoundedWindow boundedWindow, Iterator it) {
                        append((ByteString) obj, boundedWindow, (Iterator<Object>) it);
                    }
                };
            }
        }), BundleProgressHandler.ignored());
        Throwable th = null;
        try {
            try {
                ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(KV.of("X", "Y")));
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                Iterator it = hashMap.values().iterator();
                while (it.hasNext()) {
                    MatcherAssert.assertThat((Collection) it.next(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(KV.of("X", "A")), WindowedValue.valueInGlobalWindow(KV.of("X", "B")), WindowedValue.valueInGlobalWindow(KV.of("X", "C"))}));
                }
                MatcherAssert.assertThat((List) of.get("foo"), IsIterableContainingInOrder.contains(new ByteString[]{ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A", Coder.Context.NESTED)), ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B", Coder.Context.NESTED)), ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C", Coder.Context.NESTED)), ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y", Coder.Context.NESTED))}));
                MatcherAssert.assertThat((List) of.get("foo2"), IsEmptyIterable.emptyIterable());
            } finally {
            }
        } catch (Throwable th2) {
            if (newBundle != null) {
                $closeResource(th, newBundle);
            }
            throw th2;
        }
    }

    @Test
    public void testExecutionWithUserStateCaching() throws Exception {
        Pipeline create = Pipeline.create();
        launchSdkHarness(create.getOptions());
        create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.17
            @DoFn.ProcessElement
            public void process(DoFn<byte[], KV<String, String>>.ProcessContext processContext) {
            }
        })).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply("userState", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.18

            @DoFn.StateId("foo")
            private final StateSpec<BagState<String>> bufferState = StateSpecs.bag(StringUtf8Coder.of());

            @DoFn.StateId("bar")
            private final StateSpec<BagState<String>> bufferState2 = StateSpecs.bag(StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element KV<String, String> kv, @DoFn.StateId("foo") BagState<String> bagState, @DoFn.StateId("bar") BagState<String> bagState2, DoFn.OutputReceiver<KV<String, String>> outputReceiver) {
                Iterator it = bagState.read().iterator();
                while (it.hasNext()) {
                    outputReceiver.output(KV.of((String) kv.getKey(), (String) it.next()));
                }
                if (((Boolean) bagState2.isEmpty().read()).booleanValue()) {
                    outputReceiver.output(KV.of((String) kv.getKey(), "Empty"));
                } else {
                    bagState2.clear();
                }
            }
        })).apply("gbk", GroupByKey.create());
        Optional tryFind = Iterables.tryFind(GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages(), executableStage -> {
            return !executableStage.getUserStates().isEmpty();
        });
        Preconditions.checkState(tryFind.isPresent(), "Expected a stage with user state.");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("test_stage", (ExecutableStage) tryFind.get(), this.dataServer.getApiServiceDescriptor(), this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations(), this.stateDelegator);
        Map remoteOutputCoders = fromExecutableStage.getRemoteOutputCoders();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : remoteOutputCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            hashMap.put((String) entry.getKey(), synchronizedList);
            String str = (String) entry.getKey();
            Coder coder = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap2.put(str, RemoteOutputReceiver.of(coder, (v1) -> {
                r3.add(v1);
            }));
        }
        final ImmutableMap of = ImmutableMap.of("foo", new ArrayList(Arrays.asList(ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A", Coder.Context.NESTED)), ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B", Coder.Context.NESTED)), ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C", Coder.Context.NESTED)))), "bar", new ArrayList(Arrays.asList(ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "D", Coder.Context.NESTED)))));
        StoringStateRequestHandler storingStateRequestHandler = new StoringStateRequestHandler(StateRequestHandlers.forBagUserStateHandlerFactory(fromExecutableStage, new StateRequestHandlers.BagUserStateHandlerFactory<ByteString, Object, BoundedWindow>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.19
            public StateRequestHandlers.BagUserStateHandler<ByteString, Object, BoundedWindow> forUserState(String str2, final String str3, Coder<ByteString> coder2, Coder<Object> coder3, Coder<BoundedWindow> coder4) {
                return new StateRequestHandlers.BagUserStateHandler<ByteString, Object, BoundedWindow>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.19.1
                    public Iterable<Object> get(ByteString byteString, BoundedWindow boundedWindow) {
                        return (Iterable) of.get(str3);
                    }

                    public void append(ByteString byteString, BoundedWindow boundedWindow, Iterator<Object> it) {
                        Iterators.addAll((Collection) of.get(str3), it);
                    }

                    public void clear(ByteString byteString, BoundedWindow boundedWindow) {
                        ((List) of.get(str3)).clear();
                    }

                    public /* bridge */ /* synthetic */ void append(Object obj, BoundedWindow boundedWindow, Iterator it) {
                        append((ByteString) obj, boundedWindow, (Iterator<Object>) it);
                    }
                };
            }
        }));
        SdkHarnessClient.BundleProcessor.ActiveBundle newBundle = processor.newBundle(hashMap2, storingStateRequestHandler, BundleProgressHandler.ignored());
        try {
            ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(KV.of("X", "Y")));
            if (newBundle != null) {
                $closeResource(null, newBundle);
            }
            SdkHarnessClient.BundleProcessor.ActiveBundle newBundle2 = processor.newBundle(hashMap2, storingStateRequestHandler, BundleProgressHandler.ignored());
            Throwable th = null;
            try {
                try {
                    ((FnDataReceiver) Iterables.getOnlyElement(newBundle2.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(KV.of("X", "Z")));
                    if (newBundle2 != null) {
                        $closeResource(null, newBundle2);
                    }
                    Iterator it = hashMap.values().iterator();
                    while (it.hasNext()) {
                        MatcherAssert.assertThat((Collection) it.next(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(KV.of("X", "A")), WindowedValue.valueInGlobalWindow(KV.of("X", "B")), WindowedValue.valueInGlobalWindow(KV.of("X", "C")), WindowedValue.valueInGlobalWindow(KV.of("X", "A")), WindowedValue.valueInGlobalWindow(KV.of("X", "B")), WindowedValue.valueInGlobalWindow(KV.of("X", "C")), WindowedValue.valueInGlobalWindow(KV.of("X", "Empty"))}));
                    }
                    MatcherAssert.assertThat((List) of.get("foo"), IsIterableContainingInOrder.contains(new ByteString[]{ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A", Coder.Context.NESTED)), ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B", Coder.Context.NESTED)), ByteString.copyFrom(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C", Coder.Context.NESTED))}));
                    MatcherAssert.assertThat((List) of.get("bar"), IsEmptyIterable.emptyIterable());
                    Assert.assertEquals(3L, storingStateRequestHandler.getRequestCount());
                    ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
                    StringUtf8Coder.of().encode("X", byteStringOutputStream);
                    Assert.assertEquals("foo", ((BeamFnApi.StateRequest) storingStateRequestHandler.receivedRequests.get(0)).getStateKey().getBagUserState().getUserStateId());
                    Assert.assertEquals(((BeamFnApi.StateRequest) storingStateRequestHandler.receivedRequests.get(0)).getStateKey().getBagUserState().getKey(), byteStringOutputStream.toByteString());
                    Assert.assertTrue(((BeamFnApi.StateRequest) storingStateRequestHandler.receivedRequests.get(0)).hasGet());
                    Assert.assertEquals("bar", ((BeamFnApi.StateRequest) storingStateRequestHandler.receivedRequests.get(1)).getStateKey().getBagUserState().getUserStateId());
                    Assert.assertEquals(((BeamFnApi.StateRequest) storingStateRequestHandler.receivedRequests.get(1)).getStateKey().getBagUserState().getKey(), byteStringOutputStream.toByteString());
                    Assert.assertTrue(((BeamFnApi.StateRequest) storingStateRequestHandler.receivedRequests.get(1)).hasGet());
                    Assert.assertEquals("bar", ((BeamFnApi.StateRequest) storingStateRequestHandler.receivedRequests.get(2)).getStateKey().getBagUserState().getUserStateId());
                    Assert.assertEquals(((BeamFnApi.StateRequest) storingStateRequestHandler.receivedRequests.get(2)).getStateKey().getBagUserState().getKey(), byteStringOutputStream.toByteString());
                    Assert.assertTrue(((BeamFnApi.StateRequest) storingStateRequestHandler.receivedRequests.get(2)).hasClear());
                } finally {
                }
            } catch (Throwable th2) {
                if (newBundle2 != null) {
                    $closeResource(th, newBundle2);
                }
                throw th2;
            }
        } catch (Throwable th3) {
            if (newBundle != null) {
                $closeResource(null, newBundle);
            }
            throw th3;
        }
    }

    @Test
    public void testExecutionWithTimer() throws Exception {
        launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline create = Pipeline.create();
        create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.20
            @DoFn.ProcessElement
            public void process(DoFn<byte[], KV<String, String>>.ProcessContext processContext) {
            }
        })).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply("timer", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.21

            @DoFn.TimerId("event")
            private final TimerSpec eventTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

            @DoFn.TimerId("processing")
            private final TimerSpec processingTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, String>, KV<String, String>>.ProcessContext processContext, @DoFn.TimerId("event") Timer timer, @DoFn.TimerId("processing") Timer timer2) {
                processContext.output(KV.of("main" + ((String) ((KV) processContext.element()).getKey()), ""));
                timer.withOutputTimestamp(processContext.timestamp()).set(processContext.timestamp().plus(org.joda.time.Duration.millis(1L)));
                timer2.offset(org.joda.time.Duration.millis(2L));
                timer2.setRelative();
            }

            @DoFn.OnTimer("event")
            public void eventTimer(DoFn<KV<String, String>, KV<String, String>>.OnTimerContext onTimerContext, @DoFn.Key String str, @DoFn.TimerId("event") Timer timer, @DoFn.TimerId("processing") Timer timer2) {
                onTimerContext.output(KV.of("event", str));
                timer.withOutputTimestamp(onTimerContext.timestamp()).set(onTimerContext.fireTimestamp().plus(org.joda.time.Duration.millis(11L)));
                timer2.offset(org.joda.time.Duration.millis(12L));
                timer2.setRelative();
            }

            @DoFn.OnTimer("processing")
            public void processingTimer(DoFn<KV<String, String>, KV<String, String>>.OnTimerContext onTimerContext, @DoFn.Key String str, @DoFn.TimerId("event") Timer timer, @DoFn.TimerId("processing") Timer timer2) {
                onTimerContext.output(KV.of("processing", str));
                timer.withOutputTimestamp(onTimerContext.timestamp()).set(onTimerContext.fireTimestamp().plus(org.joda.time.Duration.millis(21L)));
                timer2.offset(org.joda.time.Duration.millis(22L));
                timer2.setRelative();
            }

            @DoFn.OnWindowExpiration
            public void onWindowExpiration(@DoFn.Key String str, DoFn.OutputReceiver<KV<String, String>> outputReceiver) {
                outputReceiver.output(KV.of("onWindowExpiration", str));
            }
        })).apply("gbk", GroupByKey.create());
        Optional tryFind = Iterables.tryFind(GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages(), executableStage -> {
            return !executableStage.getTimers().isEmpty();
        });
        Preconditions.checkState(tryFind.isPresent(), "Expected a stage with timers.");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("test_stage", (ExecutableStage) tryFind.get(), this.dataServer.getApiServiceDescriptor(), this.stateServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations(), this.stateDelegator, fromExecutableStage.getTimerSpecs());
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : fromExecutableStage.getRemoteOutputCoders().entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            hashMap.put((String) entry.getKey(), synchronizedList);
            String str = (String) entry.getKey();
            Coder coder = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap2.put(str, RemoteOutputReceiver.of(coder, (v1) -> {
                r3.add(v1);
            }));
        }
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        Iterator it = fromExecutableStage.getTimerSpecs().entrySet().iterator();
        while (it.hasNext()) {
            for (ProcessBundleDescriptors.TimerSpec timerSpec : ((Map) ((Map.Entry) it.next()).getValue()).values()) {
                KV of = KV.of(timerSpec.transformId(), timerSpec.timerId());
                List synchronizedList2 = Collections.synchronizedList(new ArrayList());
                hashMap3.put(of, synchronizedList2);
                Coder coder2 = timerSpec.coder();
                Objects.requireNonNull(synchronizedList2);
                hashMap4.put(of, RemoteOutputReceiver.of(coder2, (v1) -> {
                    r3.add(v1);
                }));
            }
        }
        ProcessBundleDescriptors.TimerSpec timerSpec2 = null;
        ProcessBundleDescriptors.TimerSpec timerSpec3 = null;
        ProcessBundleDescriptors.TimerSpec timerSpec4 = null;
        Iterator it2 = fromExecutableStage.getTimerSpecs().values().iterator();
        while (it2.hasNext()) {
            for (ProcessBundleDescriptors.TimerSpec timerSpec5 : ((Map) it2.next()).values()) {
                if ("onWindowExpiration0".equals(timerSpec5.timerId())) {
                    timerSpec4 = timerSpec5;
                } else if (TimeDomain.EVENT_TIME.equals(timerSpec5.getTimerSpec().getTimeDomain())) {
                    timerSpec2 = timerSpec5;
                } else if (TimeDomain.PROCESSING_TIME.equals(timerSpec5.getTimerSpec().getTimeDomain())) {
                    timerSpec3 = timerSpec5;
                } else {
                    Assert.fail(String.format("Unknown timer specification %s", timerSpec5));
                }
            }
        }
        DateTimeUtils.setCurrentMillisFixed(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis() + 10000);
        try {
            SdkHarnessClient.BundleProcessor.ActiveBundle newBundle = processor.newBundle(hashMap2, hashMap4, StateRequestHandler.unsupported(), BundleProgressHandler.ignored(), (BundleFinalizationHandler) null, (BundleCheckpointHandler) null);
            try {
                ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(KV.of("X", "X")));
                ((FnDataReceiver) newBundle.getTimerReceivers().get(KV.of(timerSpec2.transformId(), timerSpec2.timerId()))).accept(timerForTest("Y", 1000L, 100L));
                ((FnDataReceiver) newBundle.getTimerReceivers().get(KV.of(timerSpec3.transformId(), timerSpec3.timerId()))).accept(timerForTest("Z", 2000L, 200L));
                ((FnDataReceiver) newBundle.getTimerReceivers().get(KV.of(timerSpec4.transformId(), timerSpec4.timerId()))).accept(timerForTest("key", 5001L, 5000L));
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                MatcherAssert.assertThat((Collection) hashMap.get((String) Iterables.getOnlyElement(fromExecutableStage.getRemoteOutputCoders().keySet())), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(KV.of("mainX", "")), WindowedValue.timestampedValueInGlobalWindow(KV.of("event", "Y"), BoundedWindow.TIMESTAMP_MIN_VALUE.plus(org.joda.time.Duration.millis(100L))), WindowedValue.timestampedValueInGlobalWindow(KV.of("processing", "Z"), BoundedWindow.TIMESTAMP_MIN_VALUE.plus(org.joda.time.Duration.millis(200L))), WindowedValue.timestampedValueInGlobalWindow(KV.of("onWindowExpiration", "key"), BoundedWindow.TIMESTAMP_MIN_VALUE.plus(org.joda.time.Duration.millis(5000L)))}));
                MatcherAssert.assertThat((Collection) hashMap3.get(KV.of(timerSpec2.transformId(), timerSpec2.timerId())), Matchers.containsInAnyOrder(new org.apache.beam.runners.core.construction.Timer[]{timerForTest("X", 1L, 0L), timerForTest("Y", 1011L, 100L), timerForTest("Z", 2021L, 200L)}));
                MatcherAssert.assertThat((Collection) hashMap3.get(KV.of(timerSpec3.transformId(), timerSpec3.timerId())), Matchers.containsInAnyOrder(new org.apache.beam.runners.core.construction.Timer[]{timerForTest("X", 10002L, 0L), timerForTest("Y", 10012L, 100L), timerForTest("Z", 10022L, 200L)}));
            } catch (Throwable th) {
                if (newBundle != null) {
                    $closeResource(null, newBundle);
                }
                throw th;
            }
        } finally {
            DateTimeUtils.setCurrentMillisSystem();
        }
    }

    @Test
    public void testExecutionWithMultipleStages() throws Exception {
        launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline create = Pipeline.create();
        Function function = str -> {
            return create.apply("impulse" + str, Impulse.create()).apply("create" + str, ParDo.of(new DoFn<byte[], String>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.22
                @DoFn.ProcessElement
                public void process(DoFn<byte[], String>.ProcessContext processContext) {
                    try {
                        processContext.output((String) CoderUtils.decodeFromByteArray(StringUtf8Coder.of(), (byte[]) processContext.element()));
                    } catch (CoderException e) {
                        throw new RuntimeException((Throwable) e);
                    }
                }
            })).setCoder(StringUtf8Coder.of()).apply(ParDo.of(new DoFn<String, String>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.23
                @DoFn.ProcessElement
                public void processElement(DoFn<String, String>.ProcessContext processContext) {
                    processContext.output("stream" + str + ((String) processContext.element()));
                }
            }));
        };
        PCollectionList.of((PCollection) function.apply("1")).and((PCollection) function.apply("2")).apply(Flatten.pCollections()).apply("createKV", ParDo.of(new DoFn<String, KV<String, String>>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.24
            @DoFn.ProcessElement
            public void process(DoFn<String, KV<String, String>>.ProcessContext processContext) {
                processContext.output(KV.of((String) processContext.element(), ""));
            }
        })).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply("gbk", GroupByKey.create());
        Set<ExecutableStage> fusedStages = GreedyPipelineFuser.fuse(PipelineTranslation.toProto(create)).getFusedStages();
        MatcherAssert.assertThat(Integer.valueOf(fusedStages.size()), Matchers.equalTo(2));
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        for (ExecutableStage executableStage : fusedStages) {
            ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage(executableStage.toString(), executableStage, this.dataServer.getApiServiceDescriptor(), this.stateServer.getApiServiceDescriptor());
            SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations(), this.stateDelegator);
            Map remoteOutputCoders = fromExecutableStage.getRemoteOutputCoders();
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : remoteOutputCoders.entrySet()) {
                String str2 = (String) entry.getKey();
                Coder coder = (Coder) entry.getValue();
                Objects.requireNonNull(synchronizedList);
                hashMap.putIfAbsent(str2, RemoteOutputReceiver.of(coder, (v1) -> {
                    r3.add(v1);
                }));
            }
            SdkHarnessClient.BundleProcessor.ActiveBundle newBundle = processor.newBundle(hashMap, StateRequestHandler.unsupported(), BundleProgressHandler.ignored());
            Throwable th = null;
            try {
                try {
                    ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
                    if (newBundle != null) {
                        $closeResource(null, newBundle);
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (newBundle != null) {
                    $closeResource(th, newBundle);
                }
                throw th2;
            }
        }
        MatcherAssert.assertThat(synchronizedList, Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(KV.of("stream1X", "")), WindowedValue.valueInGlobalWindow(KV.of("stream2X", ""))}));
    }

    @Test(timeout = 60000)
    public void testSplit() throws Exception {
        launchSdkHarness(PipelineOptionsFactory.create());
        Pipeline create = Pipeline.create();
        create.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], String>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.25
            @DoFn.ProcessElement
            public void process(DoFn<byte[], String>.ProcessContext processContext) {
                processContext.output("zero");
                processContext.output("WaitTillSplit");
                processContext.output("two");
            }
        })).apply("forceSplit", ParDo.of(new DoFn<String, String>() { // from class: org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.26
            @DoFn.GetInitialRestriction
            public String getInitialRestriction(@DoFn.Element String str) {
                return str;
            }

            @DoFn.NewTracker
            public WaitingTillSplitRestrictionTracker newTracker(@DoFn.Restriction String str) {
                return new WaitingTillSplitRestrictionTracker(str);
            }

            @DoFn.ProcessElement
            public void process(RestrictionTracker<String, Void> restrictionTracker, DoFn<String, String>.ProcessContext processContext) {
                do {
                } while (restrictionTracker.tryClaim((Object) null));
                processContext.output((String) restrictionTracker.currentRestriction());
            }
        })).apply("addKeys", WithKeys.of("foo")).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply("gbk", GroupByKey.create());
        Optional tryFind = Iterables.tryFind(GreedyPipelineFuser.fuse(ProtoOverrides.updateTransform("beam:transform:pardo:v1", PipelineTranslation.toProto(create), SplittableParDoExpander.createSizedReplacement())).getFusedStages(), executableStage -> {
            return Iterables.filter(executableStage.getTransforms(), pTransformNode -> {
                return "beam:transform:sdf_process_sized_element_and_restrictions:v1".equals(pTransformNode.getTransform().getSpec().getUrn());
            }).iterator().hasNext();
        });
        Preconditions.checkState(tryFind.isPresent(), "Expected a stage with SDF ProcessSizedElementAndRestriction.");
        ProcessBundleDescriptors.ExecutableProcessBundleDescriptor fromExecutableStage = ProcessBundleDescriptors.fromExecutableStage("my_stage", (ExecutableStage) tryFind.get(), this.dataServer.getApiServiceDescriptor());
        SdkHarnessClient.BundleProcessor processor = this.controlClient.getProcessor(fromExecutableStage.getProcessBundleDescriptor(), fromExecutableStage.getRemoteInputDestinations());
        Map remoteOutputCoders = fromExecutableStage.getRemoteOutputCoders();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : remoteOutputCoders.entrySet()) {
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            hashMap.put((String) entry.getKey(), synchronizedList);
            String str = (String) entry.getKey();
            Coder coder = (Coder) entry.getValue();
            Objects.requireNonNull(synchronizedList);
            hashMap2.put(str, RemoteOutputReceiver.of(coder, (v1) -> {
                r3.add(v1);
            }));
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        Map emptyMap = Collections.emptyMap();
        StateRequestHandler unsupported = StateRequestHandler.unsupported();
        BundleProgressHandler ignored = BundleProgressHandler.ignored();
        Objects.requireNonNull(arrayList);
        BundleSplitHandler bundleSplitHandler = (v1) -> {
            r5.add(v1);
        };
        Objects.requireNonNull(arrayList2);
        BundleCheckpointHandler bundleCheckpointHandler = (v1) -> {
            r6.add(v1);
        };
        Objects.requireNonNull(arrayList3);
        SdkHarnessClient.BundleProcessor.ActiveBundle newBundle = processor.newBundle(hashMap2, emptyMap, unsupported, ignored, bundleSplitHandler, bundleCheckpointHandler, (v1) -> {
            r7.add(v1);
        });
        try {
            ((FnDataReceiver) Iterables.getOnlyElement(newBundle.getInputReceivers().values())).accept(WindowedValue.valueInGlobalWindow(sdfSizedElementAndRestrictionForTest("WaitTillSplit")));
            ScheduledFuture<?> scheduleWithFixedDelay = newSingleThreadScheduledExecutor.scheduleWithFixedDelay(() -> {
                newBundle.split(0.5d);
            }, 0L, 100L, TimeUnit.MILLISECONDS);
            if (newBundle != null) {
                $closeResource(null, newBundle);
            }
            scheduleWithFixedDelay.cancel(false);
            newSingleThreadScheduledExecutor.shutdown();
            Assert.assertTrue(arrayList3.isEmpty());
            Assert.assertTrue(arrayList2.isEmpty());
            Assert.assertFalse(arrayList.isEmpty());
            BeamFnApi.ProcessBundleSplitResponse.ChannelSplit channelSplit = (BeamFnApi.ProcessBundleSplitResponse.ChannelSplit) Iterables.getOnlyElement(((BeamFnApi.ProcessBundleSplitResponse) arrayList.get(arrayList.size() - 1)).getChannelSplitsList());
            Assert.assertEquals(-1L, channelSplit.getLastPrimaryElement());
            Assert.assertEquals(1L, channelSplit.getFirstResidualElement());
            Assert.assertEquals(1L, r0.getPrimaryRootsCount());
            Assert.assertEquals(1L, r0.getResidualRootsCount());
            MatcherAssert.assertThat((Collection) Iterables.getOnlyElement(hashMap.values()), Matchers.containsInAnyOrder(new Object[]{WindowedValue.valueInGlobalWindow(KV.of("foo", "Primary"))}));
        } catch (Throwable th) {
            if (newBundle != null) {
                $closeResource(null, newBundle);
            }
            throw th;
        }
    }

    private KV<KV<String, KV<String, byte[]>>, Double> sdfSizedElementAndRestrictionForTest(String str) {
        return KV.of(KV.of(str, KV.of(str, new byte[0])), Double.valueOf(0.0d));
    }

    private KV<String, byte[]> byteValueOf(String str, long j) throws CoderException {
        return KV.of(str, CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), Long.valueOf(j)));
    }

    private org.apache.beam.runners.core.construction.Timer<String> timerForTest(String str, long j, long j2) {
        return org.apache.beam.runners.core.construction.Timer.of(str, "", Collections.singletonList(GlobalWindow.INSTANCE), BoundedWindow.TIMESTAMP_MIN_VALUE.plus(org.joda.time.Duration.millis(j)), BoundedWindow.TIMESTAMP_MIN_VALUE.plus(org.joda.time.Duration.millis(j2)), PaneInfo.NO_FIRING);
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
