package org.apache.beam.fn.harness.control;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.BeamFnDataReadRunner;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.BeamFnTimerClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.data.QueueingBeamFnDataClient;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.function.ThrowingConsumer;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.DoFnWithExecutionInformation;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsEmptyCollection;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.class */
public class ProcessBundleHandlerTest {
    private static final String DATA_INPUT_URN = "beam:runner:source:v1";
    private static final String DATA_OUTPUT_URN = "beam:runner:sink:v1";

    @Mock
    private BeamFnDataClient beamFnDataClient;

    @Captor
    private ArgumentCaptor<ThrowingConsumer<Exception, WindowedValue<String>>> consumerCaptor;

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandlerTest$TestBundleProcessor.class */
    private static class TestBundleProcessor extends ProcessBundleHandler.BundleProcessor {
        static int resetCnt = 0;
        private ProcessBundleHandler.BundleProcessor wrappedBundleProcessor;

        TestBundleProcessor(ProcessBundleHandler.BundleProcessor bundleProcessor) {
            this.wrappedBundleProcessor = bundleProcessor;
        }

        PTransformFunctionRegistry getStartFunctionRegistry() {
            return this.wrappedBundleProcessor.getStartFunctionRegistry();
        }

        PTransformFunctionRegistry getFinishFunctionRegistry() {
            return this.wrappedBundleProcessor.getFinishFunctionRegistry();
        }

        List<ThrowingRunnable> getResetFunctions() {
            return this.wrappedBundleProcessor.getResetFunctions();
        }

        List<ThrowingRunnable> getTearDownFunctions() {
            return this.wrappedBundleProcessor.getTearDownFunctions();
        }

        List<PTransformRunnerFactory.ProgressRequestCallback> getProgressRequestCallbacks() {
            return this.wrappedBundleProcessor.getProgressRequestCallbacks();
        }

        BundleSplitListener.InMemory getSplitListener() {
            return this.wrappedBundleProcessor.getSplitListener();
        }

        PCollectionConsumerRegistry getpCollectionConsumerRegistry() {
            return this.wrappedBundleProcessor.getpCollectionConsumerRegistry();
        }

        MetricsContainerStepMap getMetricsContainerRegistry() {
            return this.wrappedBundleProcessor.getMetricsContainerRegistry();
        }

        public ExecutionStateTracker getStateTracker() {
            return this.wrappedBundleProcessor.getStateTracker();
        }

        ProcessBundleHandler.HandleStateCallsForBundle getBeamFnStateClient() {
            return this.wrappedBundleProcessor.getBeamFnStateClient();
        }

        QueueingBeamFnDataClient getQueueingClient() {
            return this.wrappedBundleProcessor.getQueueingClient();
        }

        Collection<FinalizeBundleHandler.CallbackRegistration> getBundleFinalizationCallbackRegistrations() {
            return this.wrappedBundleProcessor.getBundleFinalizationCallbackRegistrations();
        }

        Collection<BeamFnDataReadRunner> getChannelRoots() {
            return this.wrappedBundleProcessor.getChannelRoots();
        }

        void reset() throws Exception {
            resetCnt++;
            this.wrappedBundleProcessor.reset();
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandlerTest$TestBundleProcessorCache.class */
    private static class TestBundleProcessorCache extends ProcessBundleHandler.BundleProcessorCache {
        private TestBundleProcessorCache() {
        }

        ProcessBundleHandler.BundleProcessor get(String str, String str2, Supplier<ProcessBundleHandler.BundleProcessor> supplier) {
            return new TestBundleProcessor(super.get(str, str2, supplier));
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandlerTest$TestDoFn.class */
    private static class TestDoFn extends DoFn<String, String> {
        private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput");
        static List<String> orderOfOperations = new ArrayList();
        private State state;

        /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandlerTest$TestDoFn$State.class */
        private enum State {
            NOT_SET_UP,
            SET_UP,
            START_BUNDLE,
            FINISH_BUNDLE,
            TEAR_DOWN
        }

        private TestDoFn() {
            this.state = State.NOT_SET_UP;
        }

        @DoFn.Setup
        public void setUp() {
            Preconditions.checkState(State.NOT_SET_UP.equals(this.state), "Unexpected state: %s", this.state);
            this.state = State.SET_UP;
            orderOfOperations.add("setUp");
        }

        @DoFn.Teardown
        public void tearDown() {
            Preconditions.checkState(!State.TEAR_DOWN.equals(this.state), "Unexpected state: %s", this.state);
            this.state = State.TEAR_DOWN;
            orderOfOperations.add("tearDown");
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.state = State.START_BUNDLE;
            orderOfOperations.add("startBundle");
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, String>.ProcessContext processContext, BoundedWindow boundedWindow) {
            Preconditions.checkState(State.START_BUNDLE.equals(this.state), "Unexpected state: %s", this.state);
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn<String, String>.FinishBundleContext finishBundleContext) {
            Preconditions.checkState(State.START_BUNDLE.equals(this.state), "Unexpected state: %s", this.state);
            this.state = State.FINISH_BUNDLE;
            orderOfOperations.add("finishBundle");
        }
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        TestBundleProcessor.resetCnt = 0;
    }

    @Test
    public void testTrySplitBeforeBundleDoesNotFail() {
        Assert.assertNotNull(new ProcessBundleHandler(PipelineOptionsFactory.create(), Collections.emptySet(), (Function) null, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), ImmutableMap.of(), new ProcessBundleHandler.BundleProcessorCache()).trySplit(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setProcessBundleSplit(BeamFnApi.ProcessBundleSplitRequest.newBuilder().setInstructionId("unknown-id")).build()).build().getProcessBundleSplit());
        Assert.assertEquals(0L, r0.getProcessBundleSplit().getChannelSplitsCount());
    }

    @Test
    public void testProgressBeforeBundleDoesNotFail() throws Exception {
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(PipelineOptionsFactory.create(), Collections.emptySet(), (Function) null, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), ImmutableMap.of(), new ProcessBundleHandler.BundleProcessorCache());
        processBundleHandler.progress(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setProcessBundleProgress(BeamFnApi.ProcessBundleProgressRequest.newBuilder().setInstructionId("unknown-id")).build());
        Assert.assertNotNull(processBundleHandler.trySplit(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setProcessBundleSplit(BeamFnApi.ProcessBundleSplitRequest.newBuilder().setInstructionId("unknown-id")).build()).build().getProcessBundleProgress());
        Assert.assertEquals(0L, r0.getProcessBundleProgress().getMonitoringInfosCount());
    }

    @Test
    public void testOrderOfStartAndFinishCalls() throws Exception {
        BeamFnApi.ProcessBundleDescriptor build = BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).putOutputs("2L-output", "2L-output-pc").build()).putTransforms("3L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:sink:v1").build()).putInputs("3L-input", "2L-output-pc").build()).putPcollections("2L-output-pc", RunnerApi.PCollection.getDefaultInstance()).build();
        ImmutableMap of = ImmutableMap.of("1L", build);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        PTransformRunnerFactory pTransformRunnerFactory = (pipelineOptions, beamFnDataClient, beamFnStateClient, beamFnTimerClient, str, pTransform, supplier, map, map2, map3, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, consumer, consumer2, consumer3, bundleSplitListener, bundleFinalizer) -> {
            arrayList.add(pTransform);
            pTransformFunctionRegistry.register(str, () -> {
                MatcherAssert.assertThat((String) supplier.get(), (Matcher<? super String>) Matchers.equalTo("999L"));
                arrayList2.add("Start" + str);
            });
            pTransformFunctionRegistry2.register(str, () -> {
                MatcherAssert.assertThat((String) supplier.get(), (Matcher<? super String>) Matchers.equalTo("999L"));
                arrayList2.add("Finish" + str);
            });
            return null;
        };
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), ImmutableMap.of("beam:runner:source:v1", pTransformRunnerFactory, "beam:runner:sink:v1", pTransformRunnerFactory), new ProcessBundleHandler.BundleProcessorCache()).processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).build());
        MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(build.getTransformsMap().get("3L"), build.getTransformsMap().get("2L")));
        MatcherAssert.assertThat(arrayList2, (Matcher<? super ArrayList>) Matchers.contains("Start3L", "Start2L", "Finish2L", "Finish3L"));
    }

    @Test
    public void testOrderOfSetupTeardownCalls() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).putOutputs("2L-output", "2L-output-pc").build()).putTransforms("3L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.FunctionSpec.newBuilder().setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN).setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(DoFnWithExecutionInformation.of(new TestDoFn(), TestDoFn.mainOutput, Collections.emptyMap(), DoFnSchemaInformation.create())))).build()).build().toByteString())).putInputs("3L-input", "2L-output-pc").build()).putPcollections("2L-output-pc", RunnerApi.PCollection.newBuilder().setWindowingStrategyId("window-strategy").setCoderId("2L-output-coder").build()).putWindowingStrategies("window-strategy", RunnerApi.WindowingStrategy.newBuilder().setWindowCoderId("window-strategy-coder").setWindowFn(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:window_fn:global_windows:v1")).setOutputTime(RunnerApi.OutputTime.Enum.END_OF_WINDOW).setAccumulationMode(RunnerApi.AccumulationMode.Enum.ACCUMULATING).setTrigger(RunnerApi.Trigger.newBuilder().setAlways(RunnerApi.Trigger.Always.getDefaultInstance())).setClosingBehavior(RunnerApi.ClosingBehavior.Enum.EMIT_ALWAYS).setOnTimeBehavior(RunnerApi.OnTimeBehavior.Enum.FIRE_ALWAYS).build()).putCoders("2L-output-coder", CoderTranslation.toProto(StringUtf8Coder.of()).getCoder()).putCoders("window-strategy-coder", RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN).build()).build()).build());
        HashMap newHashMap = Maps.newHashMap(ProcessBundleHandler.REGISTERED_RUNNER_FACTORIES);
        newHashMap.put("beam:runner:source:v1", (pipelineOptions, beamFnDataClient, beamFnStateClient, beamFnTimerClient, str, pTransform, supplier, map, map2, map3, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, consumer, consumer2, consumer3, bundleSplitListener, bundleFinalizer) -> {
            return null;
        });
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), newHashMap, new ProcessBundleHandler.BundleProcessorCache());
        processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("998L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).build());
        processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).build());
        processBundleHandler.shutdown();
        MatcherAssert.assertThat(TestDoFn.orderOfOperations, (Matcher<? super List<String>>) Matchers.contains("setUp", "startBundle", "finishBundle", "startBundle", "finishBundle", "tearDown"));
    }

    @Test
    public void testBundleProcessorIsResetWhenAddedBackToCache() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), ImmutableMap.of("beam:runner:source:v1", (pipelineOptions, beamFnDataClient, beamFnStateClient, beamFnTimerClient, str, pTransform, supplier, map, map2, map3, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, consumer, consumer2, consumer3, bundleSplitListener, bundleFinalizer) -> {
            return null;
        }), new TestBundleProcessorCache());
        MatcherAssert.assertThat(Integer.valueOf(TestBundleProcessor.resetCnt), (Matcher<? super Integer>) Matchers.equalTo(0));
        processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("998L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).build());
        MatcherAssert.assertThat(Integer.valueOf(TestBundleProcessor.resetCnt), (Matcher<? super Integer>) Matchers.equalTo(1));
        MatcherAssert.assertThat(Integer.valueOf(processBundleHandler.bundleProcessorCache.getCachedBundleProcessors().size()), (Matcher<? super Integer>) Matchers.equalTo(1));
        MatcherAssert.assertThat(Integer.valueOf(((ConcurrentLinkedQueue) processBundleHandler.bundleProcessorCache.getCachedBundleProcessors().get("1L")).size()), (Matcher<? super Integer>) Matchers.equalTo(1));
        ((ProcessBundleHandler.BundleProcessor) Iterables.getOnlyElement((Iterable) processBundleHandler.bundleProcessorCache.getCachedBundleProcessors().get("1L"))).getResetFunctions().add(() -> {
            throw new IllegalStateException("ResetFailed");
        });
        processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).build());
        MatcherAssert.assertThat(Integer.valueOf(((ConcurrentLinkedQueue) processBundleHandler.bundleProcessorCache.getCachedBundleProcessors().get("1L")).size()), (Matcher<? super Integer>) Matchers.equalTo(0));
    }

    @Test
    public void testBundleProcessorIsFoundWhenActive() {
        ProcessBundleHandler.BundleProcessor bundleProcessor = (ProcessBundleHandler.BundleProcessor) Mockito.mock(ProcessBundleHandler.BundleProcessor.class);
        Mockito.when(bundleProcessor.getInstructionId()).thenReturn("known");
        ProcessBundleHandler.BundleProcessorCache bundleProcessorCache = new ProcessBundleHandler.BundleProcessorCache();
        Assert.assertNull(bundleProcessorCache.find(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN));
        bundleProcessorCache.get("descriptorId", "known", () -> {
            return bundleProcessor;
        });
        Assert.assertSame(bundleProcessor, bundleProcessorCache.find("known"));
        bundleProcessorCache.release("descriptorId", bundleProcessor);
        Assert.assertNull(bundleProcessorCache.find("known"));
        bundleProcessorCache.get("descriptorId", "known", () -> {
            return bundleProcessor;
        });
        Assert.assertSame(bundleProcessor, bundleProcessorCache.find("known"));
        bundleProcessorCache.discard(bundleProcessor);
        Assert.assertNull(bundleProcessorCache.find("known"));
    }

    @Test
    public void testBundleProcessorReset() throws Exception {
        PTransformFunctionRegistry pTransformFunctionRegistry = (PTransformFunctionRegistry) Mockito.mock(PTransformFunctionRegistry.class);
        PTransformFunctionRegistry pTransformFunctionRegistry2 = (PTransformFunctionRegistry) Mockito.mock(PTransformFunctionRegistry.class);
        BundleSplitListener.InMemory inMemory = (BundleSplitListener.InMemory) Mockito.mock(BundleSplitListener.InMemory.class);
        Collection collection = (Collection) Mockito.mock(Collection.class);
        PCollectionConsumerRegistry pCollectionConsumerRegistry = (PCollectionConsumerRegistry) Mockito.mock(PCollectionConsumerRegistry.class);
        MetricsContainerStepMap metricsContainerStepMap = (MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class);
        ExecutionStateTracker executionStateTracker = (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class);
        ProcessBundleHandler.HandleStateCallsForBundle handleStateCallsForBundle = (ProcessBundleHandler.HandleStateCallsForBundle) Mockito.mock(ProcessBundleHandler.HandleStateCallsForBundle.class);
        QueueingBeamFnDataClient queueingBeamFnDataClient = (QueueingBeamFnDataClient) Mockito.mock(QueueingBeamFnDataClient.class);
        ThrowingRunnable throwingRunnable = (ThrowingRunnable) Mockito.mock(ThrowingRunnable.class);
        ProcessBundleHandler.BundleProcessor create = ProcessBundleHandler.BundleProcessor.create(pTransformFunctionRegistry, pTransformFunctionRegistry2, Collections.singletonList(throwingRunnable), new ArrayList(), new ArrayList(), inMemory, pCollectionConsumerRegistry, metricsContainerStepMap, executionStateTracker, handleStateCallsForBundle, queueingBeamFnDataClient, collection);
        create.reset();
        Assert.assertNull(create.getInstructionId());
        ((PTransformFunctionRegistry) Mockito.verify(pTransformFunctionRegistry, Mockito.times(1))).reset();
        ((PTransformFunctionRegistry) Mockito.verify(pTransformFunctionRegistry2, Mockito.times(1))).reset();
        ((BundleSplitListener.InMemory) Mockito.verify(inMemory, Mockito.times(1))).clear();
        ((PCollectionConsumerRegistry) Mockito.verify(pCollectionConsumerRegistry, Mockito.times(1))).reset();
        ((MetricsContainerStepMap) Mockito.verify(metricsContainerStepMap, Mockito.times(1))).reset();
        ((ExecutionStateTracker) Mockito.verify(executionStateTracker, Mockito.times(1))).reset();
        ((Collection) Mockito.verify(collection, Mockito.times(1))).clear();
        ((ThrowingRunnable) Mockito.verify(throwingRunnable, Mockito.times(1))).run();
    }

    @Test
    public void testCreatingPTransformExceptionsArePropagated() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), ImmutableMap.of("beam:runner:source:v1", (pipelineOptions, beamFnDataClient, beamFnStateClient, beamFnTimerClient, str, pTransform, supplier, map, map2, map3, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, consumer, consumer2, consumer3, bundleSplitListener, bundleFinalizer) -> {
            throw new IllegalStateException("TestException");
        }), new ProcessBundleHandler.BundleProcessorCache());
        Assert.assertThrows("TestException", IllegalStateException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).build());
        });
    }

    @Test
    public void testBundleFinalizationIsPropagated() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).build());
        FinalizeBundleHandler finalizeBundleHandler = (FinalizeBundleHandler) Mockito.mock(FinalizeBundleHandler.class);
        DoFn.BundleFinalizer.Callback callback = (DoFn.BundleFinalizer.Callback) Mockito.mock(DoFn.BundleFinalizer.Callback.class);
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        Assert.assertTrue(new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, finalizeBundleHandler, new ShortIdMap(), ImmutableMap.of("beam:runner:source:v1", (pipelineOptions, beamFnDataClient, beamFnStateClient, beamFnTimerClient, str, pTransform, supplier, map, map2, map3, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, consumer, consumer2, consumer3, bundleSplitListener, bundleFinalizer) -> {
            pTransformFunctionRegistry.register(str, () -> {
                bundleFinalizer.afterBundleCommit(Instant.ofEpochMilli(42L), callback);
            });
            return null;
        }), new ProcessBundleHandler.BundleProcessorCache()).processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("2L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).build()).getProcessBundle().getRequiresFinalization());
        ((FinalizeBundleHandler) Mockito.verify(finalizeBundleHandler)).registerCallbacks((String) Mockito.eq("2L"), (Collection) Mockito.argThat(collection -> {
            FinalizeBundleHandler.CallbackRegistration callbackRegistration = (FinalizeBundleHandler.CallbackRegistration) Iterables.getOnlyElement(collection);
            Assert.assertEquals(Instant.ofEpochMilli(42L), callbackRegistration.getExpiryTime());
            Assert.assertSame(callback, callbackRegistration.getCallback());
            return true;
        }));
    }

    @Test
    public void testPTransformStartExceptionsArePropagated() {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), ImmutableMap.of("beam:runner:source:v1", (pipelineOptions, beamFnDataClient, beamFnStateClient, beamFnTimerClient, str, pTransform, supplier, map, map2, map3, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, consumer, consumer2, consumer3, bundleSplitListener, bundleFinalizer) -> {
            pTransformFunctionRegistry.register(str, ProcessBundleHandlerTest::throwException);
            return null;
        }), new ProcessBundleHandler.BundleProcessorCache());
        Assert.assertThrows("TestException", IllegalStateException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).build());
        });
        MatcherAssert.assertThat((ConcurrentLinkedQueue) processBundleHandler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), (Matcher<? super ConcurrentLinkedQueue>) IsEmptyCollection.empty());
    }

    @Test
    public void testPTransformFinishExceptionsArePropagated() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), ImmutableMap.of("beam:runner:source:v1", (pipelineOptions, beamFnDataClient, beamFnStateClient, beamFnTimerClient, str, pTransform, supplier, map, map2, map3, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, consumer, consumer2, consumer3, bundleSplitListener, bundleFinalizer) -> {
            pTransformFunctionRegistry2.register(str, ProcessBundleHandlerTest::throwException);
            return null;
        }), new ProcessBundleHandler.BundleProcessorCache());
        Assert.assertThrows("TestException", IllegalStateException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).build());
        });
        MatcherAssert.assertThat((ConcurrentLinkedQueue) processBundleHandler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), (Matcher<? super ConcurrentLinkedQueue>) IsEmptyCollection.empty());
    }

    @Test
    public void testPendingStateCallsBlockTillCompletion() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).setStateApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).build());
        final CompletableFuture[] completableFutureArr = new CompletableFuture[1];
        final CompletableFuture[] completableFutureArr2 = new CompletableFuture[1];
        BeamFnStateGrpcClientCache beamFnStateGrpcClientCache = (BeamFnStateGrpcClientCache) Mockito.mock(BeamFnStateGrpcClientCache.class);
        BeamFnStateClient beamFnStateClient = (BeamFnStateClient) Mockito.mock(BeamFnStateClient.class);
        Mockito.when(beamFnStateGrpcClientCache.forApiServiceDescriptor((Endpoints.ApiServiceDescriptor) org.mockito.Matchers.any())).thenReturn(beamFnStateClient);
        ((BeamFnStateClient) Mockito.doAnswer(invocationOnMock -> {
            BeamFnApi.StateRequest.Builder builder = (BeamFnApi.StateRequest.Builder) invocationOnMock.getArguments()[0];
            CompletableFuture completableFuture = new CompletableFuture();
            new Thread(() -> {
                Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
                String instructionId = builder.getInstructionId();
                boolean z = -1;
                switch (instructionId.hashCode()) {
                    case -1149187101:
                        if (instructionId.equals("SUCCESS")) {
                            z = false;
                            break;
                        }
                        break;
                    case 2150174:
                        if (instructionId.equals("FAIL")) {
                            z = true;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        completableFuture.complete(BeamFnApi.StateResponse.getDefaultInstance());
                        return;
                    case true:
                        completableFuture.completeExceptionally(new RuntimeException("TEST ERROR"));
                        return;
                    default:
                        return;
                }
            }).start();
            return completableFuture;
        }).when(beamFnStateClient)).handle((BeamFnApi.StateRequest.Builder) org.mockito.Matchers.any());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, beamFnStateGrpcClientCache, (FinalizeBundleHandler) null, new ShortIdMap(), ImmutableMap.of("beam:runner:source:v1", new PTransformRunnerFactory<Object>() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.1
            public Object createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient2, BeamFnTimerClient beamFnTimerClient, String str, RunnerApi.PTransform pTransform, Supplier<String> supplier, Map<String, RunnerApi.PCollection> map, Map<String, RunnerApi.Coder> map2, Map<String, RunnerApi.WindowingStrategy> map3, PCollectionConsumerRegistry pCollectionConsumerRegistry, PTransformFunctionRegistry pTransformFunctionRegistry, PTransformFunctionRegistry pTransformFunctionRegistry2, Consumer<ThrowingRunnable> consumer, Consumer<ThrowingRunnable> consumer2, Consumer<PTransformRunnerFactory.ProgressRequestCallback> consumer3, BundleSplitListener bundleSplitListener, DoFn.BundleFinalizer bundleFinalizer) throws IOException {
                pTransformFunctionRegistry.register(str, () -> {
                    doStateCalls(beamFnStateClient2);
                });
                return null;
            }

            private void doStateCalls(BeamFnStateClient beamFnStateClient2) {
                completableFutureArr[0] = beamFnStateClient2.handle(BeamFnApi.StateRequest.newBuilder().setInstructionId("SUCCESS"));
                completableFutureArr2[0] = beamFnStateClient2.handle(BeamFnApi.StateRequest.newBuilder().setInstructionId("FAIL"));
            }
        }), new ProcessBundleHandler.BundleProcessorCache()).processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).build());
        Assert.assertTrue(completableFutureArr[0].isDone());
        Assert.assertTrue(completableFutureArr2[0].isDone());
    }

    @Test
    public void testStateCallsFailIfNoStateApiServiceDescriptorSpecified() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), ImmutableMap.of("beam:runner:source:v1", new PTransformRunnerFactory<Object>() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.2
            public Object createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, BeamFnTimerClient beamFnTimerClient, String str, RunnerApi.PTransform pTransform, Supplier<String> supplier, Map<String, RunnerApi.PCollection> map, Map<String, RunnerApi.Coder> map2, Map<String, RunnerApi.WindowingStrategy> map3, PCollectionConsumerRegistry pCollectionConsumerRegistry, PTransformFunctionRegistry pTransformFunctionRegistry, PTransformFunctionRegistry pTransformFunctionRegistry2, Consumer<ThrowingRunnable> consumer, Consumer<ThrowingRunnable> consumer2, Consumer<PTransformRunnerFactory.ProgressRequestCallback> consumer3, BundleSplitListener bundleSplitListener, DoFn.BundleFinalizer bundleFinalizer) throws IOException {
                pTransformFunctionRegistry.register(str, () -> {
                    doStateCalls(beamFnStateClient);
                });
                return null;
            }

            private void doStateCalls(BeamFnStateClient beamFnStateClient) {
                beamFnStateClient.handle(BeamFnApi.StateRequest.newBuilder().setInstructionId("SUCCESS"));
            }
        }), new ProcessBundleHandler.BundleProcessorCache());
        Assert.assertThrows("State API calls are unsupported", IllegalStateException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).build());
        });
    }

    @Test
    public void testTimerRegistrationsFailIfNoTimerApiServiceDescriptorSpecified() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), ImmutableMap.of("beam:runner:source:v1", new PTransformRunnerFactory<Object>() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.3
            public Object createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, BeamFnTimerClient beamFnTimerClient, String str, RunnerApi.PTransform pTransform, Supplier<String> supplier, Map<String, RunnerApi.PCollection> map, Map<String, RunnerApi.Coder> map2, Map<String, RunnerApi.WindowingStrategy> map3, PCollectionConsumerRegistry pCollectionConsumerRegistry, PTransformFunctionRegistry pTransformFunctionRegistry, PTransformFunctionRegistry pTransformFunctionRegistry2, Consumer<ThrowingRunnable> consumer, Consumer<ThrowingRunnable> consumer2, Consumer<PTransformRunnerFactory.ProgressRequestCallback> consumer3, BundleSplitListener bundleSplitListener, DoFn.BundleFinalizer bundleFinalizer) throws IOException {
                pTransformFunctionRegistry.register(str, () -> {
                    doTimerRegistrations(beamFnTimerClient);
                });
                return null;
            }

            private void doTimerRegistrations(BeamFnTimerClient beamFnTimerClient) {
                beamFnTimerClient.register(LogicalEndpoint.timer("1L", "2L", "Timer"), Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE), timer -> {
                });
            }
        }), new ProcessBundleHandler.BundleProcessorCache());
        Assert.assertThrows("Timers are unsupported", IllegalStateException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).build());
        });
    }

    private static void throwException() {
        throw new IllegalStateException("TestException");
    }
}
