package org.apache.beam.fn.harness.control;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.BeamFnDataReadRunner;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.debug.DataSampler;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.DataEndpoint;
import org.apache.beam.sdk.fn.data.TimerEndpoint;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.DoFnWithExecutionInformation;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.logging.log4j.util.ProcessIdUtil;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsEmptyCollection;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.class */
public class ProcessBundleHandlerTest {
    private static final String DATA_INPUT_URN = "beam:runner:source:v1";
    private static final String DATA_OUTPUT_URN = "beam:runner:sink:v1";

    @Rule
    public TestExecutors.TestExecutorService executor = TestExecutors.from((Supplier<ExecutorService>) Executors::newCachedThreadPool);

    @Mock
    private BeamFnDataClient beamFnDataClient;
    private ExecutionStateSampler executionStateSampler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandlerTest$SimpleDoFn.class */
    public static final class SimpleDoFn extends DoFn<KV<String, String>, String> {
        private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<>("mainOutput");
        private static final String TIMER_FAMILY_ID = "timer_family";

        @DoFn.TimerFamily(TIMER_FAMILY_ID)
        private final TimerSpec timer;

        private SimpleDoFn() {
            this.timer = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, String>, String>.ProcessContext processContext, BoundedWindow boundedWindow) {
        }

        @DoFn.OnTimerFamily(TIMER_FAMILY_ID)
        public void onTimer(@DoFn.TimerFamily("timer_family") TimerMap timerMap) {
            timerMap.get("output_timer").withOutputTimestamp(Instant.ofEpochMilli(100L)).set(Instant.ofEpochMilli(100L));
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandlerTest$TestBundleProcessor.class */
    private static class TestBundleProcessor extends ProcessBundleHandler.BundleProcessor {
        static int resetCnt = 0;
        private ProcessBundleHandler.BundleProcessor wrappedBundleProcessor;

        TestBundleProcessor(ProcessBundleHandler.BundleProcessor bundleProcessor) {
            this.wrappedBundleProcessor = bundleProcessor;
        }

        Cache<?, ?> getProcessWideCache() {
            return this.wrappedBundleProcessor.getProcessWideCache();
        }

        BundleProgressReporter.InMemory getBundleProgressReporterAndRegistrar() {
            return this.wrappedBundleProcessor.getBundleProgressReporterAndRegistrar();
        }

        BeamFnApi.ProcessBundleDescriptor getProcessBundleDescriptor() {
            return this.wrappedBundleProcessor.getProcessBundleDescriptor();
        }

        PTransformFunctionRegistry getStartFunctionRegistry() {
            return this.wrappedBundleProcessor.getStartFunctionRegistry();
        }

        PTransformFunctionRegistry getFinishFunctionRegistry() {
            return this.wrappedBundleProcessor.getFinishFunctionRegistry();
        }

        List<ThrowingRunnable> getResetFunctions() {
            return this.wrappedBundleProcessor.getResetFunctions();
        }

        List<ThrowingRunnable> getTearDownFunctions() {
            return this.wrappedBundleProcessor.getTearDownFunctions();
        }

        BundleSplitListener.InMemory getSplitListener() {
            return this.wrappedBundleProcessor.getSplitListener();
        }

        PCollectionConsumerRegistry getpCollectionConsumerRegistry() {
            return this.wrappedBundleProcessor.getpCollectionConsumerRegistry();
        }

        ProcessBundleHandler.MetricsEnvironmentStateForBundle getMetricsEnvironmentStateForBundle() {
            return this.wrappedBundleProcessor.getMetricsEnvironmentStateForBundle();
        }

        public ExecutionStateSampler.ExecutionStateTracker getStateTracker() {
            return this.wrappedBundleProcessor.getStateTracker();
        }

        ProcessBundleHandler.HandleStateCallsForBundle getBeamFnStateClient() {
            return this.wrappedBundleProcessor.getBeamFnStateClient();
        }

        List<Endpoints.ApiServiceDescriptor> getInboundEndpointApiServiceDescriptors() {
            return this.wrappedBundleProcessor.getInboundEndpointApiServiceDescriptors();
        }

        List<DataEndpoint<?>> getInboundDataEndpoints() {
            return this.wrappedBundleProcessor.getInboundDataEndpoints();
        }

        List<TimerEndpoint<?>> getTimerEndpoints() {
            return this.wrappedBundleProcessor.getTimerEndpoints();
        }

        Collection<FinalizeBundleHandler.CallbackRegistration> getBundleFinalizationCallbackRegistrations() {
            return this.wrappedBundleProcessor.getBundleFinalizationCallbackRegistrations();
        }

        Collection<BeamFnDataReadRunner> getChannelRoots() {
            return this.wrappedBundleProcessor.getChannelRoots();
        }

        Map<Endpoints.ApiServiceDescriptor, BeamFnDataOutboundAggregator> getOutboundAggregators() {
            return this.wrappedBundleProcessor.getOutboundAggregators();
        }

        Set<String> getRunnerCapabilities() {
            return this.wrappedBundleProcessor.getRunnerCapabilities();
        }

        Lock getProgressRequestLock() {
            return this.wrappedBundleProcessor.getProgressRequestLock();
        }

        void reset() throws Exception {
            resetCnt++;
            this.wrappedBundleProcessor.reset();
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandlerTest$TestBundleProcessorCache.class */
    private static class TestBundleProcessorCache extends ProcessBundleHandler.BundleProcessorCache {
        private TestBundleProcessorCache() {
        }

        ProcessBundleHandler.BundleProcessor get(BeamFnApi.InstructionRequest instructionRequest, Supplier<ProcessBundleHandler.BundleProcessor> supplier) {
            return new TestBundleProcessor(super.get(instructionRequest, supplier));
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandlerTest$TestDoFn.class */
    private static class TestDoFn extends DoFn<String, String> {
        private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput");
        static List<String> orderOfOperations = new ArrayList();
        private State state;

        /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandlerTest$TestDoFn$State.class */
        private enum State {
            NOT_SET_UP,
            SET_UP,
            START_BUNDLE,
            FINISH_BUNDLE,
            TEAR_DOWN
        }

        private TestDoFn() {
            this.state = State.NOT_SET_UP;
        }

        @DoFn.Setup
        public void setUp() {
            Preconditions.checkState(State.NOT_SET_UP.equals(this.state), "Unexpected state: %s", this.state);
            this.state = State.SET_UP;
            orderOfOperations.add("setUp");
        }

        @DoFn.Teardown
        public void tearDown() {
            Preconditions.checkState(!State.TEAR_DOWN.equals(this.state), "Unexpected state: %s", this.state);
            this.state = State.TEAR_DOWN;
            orderOfOperations.add("tearDown");
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.state = State.START_BUNDLE;
            orderOfOperations.add("startBundle");
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, String>.ProcessContext processContext, BoundedWindow boundedWindow) {
            Preconditions.checkState(State.START_BUNDLE.equals(this.state), "Unexpected state: %s", this.state);
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn<String, String>.FinishBundleContext finishBundleContext) {
            Preconditions.checkState(State.START_BUNDLE.equals(this.state), "Unexpected state: %s", this.state);
            this.state = State.FINISH_BUNDLE;
            orderOfOperations.add("finishBundle");
        }
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        TestBundleProcessor.resetCnt = 0;
        this.executionStateSampler = new ExecutionStateSampler(PipelineOptionsFactory.create(), System::currentTimeMillis);
    }

    @After
    public void tearDown() {
        this.executionStateSampler.stop();
    }

    @Test
    public void testTrySplitBeforeBundleDoesNotFail() {
        Assert.assertNotNull(new ProcessBundleHandler(PipelineOptionsFactory.create(), Collections.emptySet(), (Function) null, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, ImmutableMap.of(), Caches.noop(), new ProcessBundleHandler.BundleProcessorCache(), (DataSampler) null).trySplit(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setProcessBundleSplit(BeamFnApi.ProcessBundleSplitRequest.newBuilder().setInstructionId("unknown-id")).m1074build()).m1122build().getProcessBundleSplit());
        Assert.assertEquals(0L, r0.getProcessBundleSplit().getChannelSplitsCount());
    }

    @Test
    public void testProgressBeforeBundleDoesNotFail() throws Exception {
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(PipelineOptionsFactory.create(), Collections.emptySet(), (Function) null, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, ImmutableMap.of(), Caches.noop(), new ProcessBundleHandler.BundleProcessorCache(), (DataSampler) null);
        processBundleHandler.progress(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setProcessBundleProgress(BeamFnApi.ProcessBundleProgressRequest.newBuilder().setInstructionId("unknown-id")).m1074build());
        Assert.assertNotNull(processBundleHandler.trySplit(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setProcessBundleSplit(BeamFnApi.ProcessBundleSplitRequest.newBuilder().setInstructionId("unknown-id")).m1074build()).m1122build().getProcessBundleProgress());
        Assert.assertEquals(0L, r0.getProcessBundleProgress().getMonitoringInfosCount());
    }

    @Test
    public void testOrderOfStartAndFinishCalls() throws Exception {
        BeamFnApi.ProcessBundleDescriptor m1456build = BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).putOutputs("2L-output", "2L-output-pc").build()).putTransforms("3L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:sink:v1").build()).putInputs("3L-input", "2L-output-pc").build()).putPcollections("2L-output-pc", RunnerApi.PCollection.getDefaultInstance()).m1456build();
        ImmutableMap of = ImmutableMap.of("1L", m1456build);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        PTransformRunnerFactory pTransformRunnerFactory = context -> {
            String pTransformId = context.getPTransformId();
            arrayList.add(context.getPTransform());
            Supplier processBundleInstructionIdSupplier = context.getProcessBundleInstructionIdSupplier();
            context.addStartBundleFunction(() -> {
                MatcherAssert.assertThat((String) processBundleInstructionIdSupplier.get(), (Matcher<? super String>) Matchers.equalTo("999L"));
                arrayList2.add("Start" + pTransformId);
            });
            context.addFinishBundleFunction(() -> {
                MatcherAssert.assertThat((String) processBundleInstructionIdSupplier.get(), (Matcher<? super String>) Matchers.equalTo("999L"));
                arrayList2.add("Finish" + pTransformId);
            });
            return null;
        };
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, ImmutableMap.of("beam:runner:source:v1", pTransformRunnerFactory, "beam:runner:sink:v1", pTransformRunnerFactory), Caches.noop(), new ProcessBundleHandler.BundleProcessorCache(), (DataSampler) null).processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build());
        MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(m1456build.getTransformsMap().get("3L"), m1456build.getTransformsMap().get("2L")));
        MatcherAssert.assertThat(arrayList2, (Matcher<? super ArrayList>) Matchers.contains("Start3L", "Start2L", "Finish2L", "Finish3L"));
    }

    @Test
    public void testOrderOfSetupTeardownCalls() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).putOutputs("2L-output", "2L-output-pc").build()).putTransforms("3L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.FunctionSpec.newBuilder().setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN).setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(DoFnWithExecutionInformation.of(new TestDoFn(), TestDoFn.mainOutput, Collections.emptyMap(), DoFnSchemaInformation.create())))).build()).build().toByteString())).putInputs("3L-input", "2L-output-pc").build()).putPcollections("2L-output-pc", RunnerApi.PCollection.newBuilder().setWindowingStrategyId("window-strategy").setCoderId("2L-output-coder").setIsBounded(RunnerApi.IsBounded.Enum.BOUNDED).build()).putWindowingStrategies("window-strategy", RunnerApi.WindowingStrategy.newBuilder().setWindowCoderId("window-strategy-coder").setWindowFn(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:window_fn:global_windows:v1")).setOutputTime(RunnerApi.OutputTime.Enum.END_OF_WINDOW).setAccumulationMode(RunnerApi.AccumulationMode.Enum.ACCUMULATING).setTrigger(RunnerApi.Trigger.newBuilder().setAlways(RunnerApi.Trigger.Always.getDefaultInstance())).setClosingBehavior(RunnerApi.ClosingBehavior.Enum.EMIT_ALWAYS).setOnTimeBehavior(RunnerApi.OnTimeBehavior.Enum.FIRE_ALWAYS).build()).putCoders("2L-output-coder", CoderTranslation.toProto(StringUtf8Coder.of()).getCoder()).putCoders("window-strategy-coder", RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN).build()).build()).m1456build());
        HashMap newHashMap = Maps.newHashMap(ProcessBundleHandler.REGISTERED_RUNNER_FACTORIES);
        newHashMap.put("beam:runner:source:v1", context -> {
            return null;
        });
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, newHashMap, Caches.noop(), new ProcessBundleHandler.BundleProcessorCache(), (DataSampler) null);
        processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("998L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build());
        processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build());
        processBundleHandler.shutdown();
        MatcherAssert.assertThat(TestDoFn.orderOfOperations, (Matcher<? super List<String>>) Matchers.contains("setUp", "startBundle", "finishBundle", "startBundle", "finishBundle", "tearDown"));
    }

    @Test
    public void testBundleProcessorIsResetWhenAddedBackToCache() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).m1456build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, ImmutableMap.of("beam:runner:source:v1", context -> {
            return null;
        }), Caches.noop(), new TestBundleProcessorCache(), (DataSampler) null);
        MatcherAssert.assertThat(Integer.valueOf(TestBundleProcessor.resetCnt), (Matcher<? super Integer>) Matchers.equalTo(0));
        processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("998L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build());
        MatcherAssert.assertThat(Integer.valueOf(TestBundleProcessor.resetCnt), (Matcher<? super Integer>) Matchers.equalTo(1));
        MatcherAssert.assertThat(Integer.valueOf(processBundleHandler.bundleProcessorCache.getCachedBundleProcessors().size()), (Matcher<? super Integer>) Matchers.equalTo(1));
        MatcherAssert.assertThat(Integer.valueOf(((ConcurrentLinkedQueue) processBundleHandler.bundleProcessorCache.getCachedBundleProcessors().get("1L")).size()), (Matcher<? super Integer>) Matchers.equalTo(1));
        ((ProcessBundleHandler.BundleProcessor) Iterables.getOnlyElement((Iterable) processBundleHandler.bundleProcessorCache.getCachedBundleProcessors().get("1L"))).getResetFunctions().add(() -> {
            throw new IllegalStateException("ResetFailed");
        });
        processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build());
        MatcherAssert.assertThat(Integer.valueOf(((ConcurrentLinkedQueue) processBundleHandler.bundleProcessorCache.getCachedBundleProcessors().get("1L")).size()), (Matcher<? super Integer>) Matchers.equalTo(0));
    }

    private static BeamFnApi.InstructionRequest processBundleRequestFor(String str, String str2, BeamFnApi.ProcessBundleRequest.CacheToken... cacheTokenArr) {
        return BeamFnApi.InstructionRequest.newBuilder().setInstructionId(str).setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId(str2).addAllCacheTokens(Arrays.asList(cacheTokenArr))).m1074build();
    }

    @Test
    public void testBundleProcessorIsFoundWhenActive() {
        ProcessBundleHandler.BundleProcessor bundleProcessor = (ProcessBundleHandler.BundleProcessor) Mockito.mock(ProcessBundleHandler.BundleProcessor.class);
        Mockito.when(bundleProcessor.getInstructionId()).thenReturn("known");
        ProcessBundleHandler.BundleProcessorCache bundleProcessorCache = new ProcessBundleHandler.BundleProcessorCache();
        Assert.assertNull(bundleProcessorCache.find(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN));
        bundleProcessorCache.get(processBundleRequestFor("known", "descriptorId", new BeamFnApi.ProcessBundleRequest.CacheToken[0]), () -> {
            return bundleProcessor;
        });
        Assert.assertSame(bundleProcessor, bundleProcessorCache.find("known"));
        bundleProcessorCache.release("descriptorId", bundleProcessor);
        Assert.assertNull(bundleProcessorCache.find("known"));
        bundleProcessorCache.get(processBundleRequestFor("known", "descriptorId", new BeamFnApi.ProcessBundleRequest.CacheToken[0]), () -> {
            return bundleProcessor;
        });
        Assert.assertSame(bundleProcessor, bundleProcessorCache.find("known"));
        bundleProcessorCache.discard(bundleProcessor);
        ((ProcessBundleHandler.BundleProcessor) Mockito.verify(bundleProcessor)).discard();
        Assert.assertNull(bundleProcessorCache.find("known"));
    }

    @Test
    public void testBundleProcessorReset() throws Exception {
        PTransformFunctionRegistry pTransformFunctionRegistry = (PTransformFunctionRegistry) Mockito.mock(PTransformFunctionRegistry.class);
        PTransformFunctionRegistry pTransformFunctionRegistry2 = (PTransformFunctionRegistry) Mockito.mock(PTransformFunctionRegistry.class);
        BundleSplitListener.InMemory inMemory = (BundleSplitListener.InMemory) Mockito.mock(BundleSplitListener.InMemory.class);
        Collection collection = (Collection) Mockito.mock(Collection.class);
        PCollectionConsumerRegistry pCollectionConsumerRegistry = (PCollectionConsumerRegistry) Mockito.mock(PCollectionConsumerRegistry.class);
        ExecutionStateSampler.ExecutionStateTracker executionStateTracker = (ExecutionStateSampler.ExecutionStateTracker) Mockito.mock(ExecutionStateSampler.ExecutionStateTracker.class);
        ProcessBundleHandler.HandleStateCallsForBundle handleStateCallsForBundle = (ProcessBundleHandler.HandleStateCallsForBundle) Mockito.mock(ProcessBundleHandler.HandleStateCallsForBundle.class);
        ThrowingRunnable throwingRunnable = (ThrowingRunnable) Mockito.mock(ThrowingRunnable.class);
        ProcessBundleHandler.BundleProcessor create = ProcessBundleHandler.BundleProcessor.create(Caches.eternal(), new BundleProgressReporter.InMemory(), BeamFnApi.ProcessBundleDescriptor.getDefaultInstance(), pTransformFunctionRegistry, pTransformFunctionRegistry2, Collections.singletonList(throwingRunnable), new ArrayList(), inMemory, pCollectionConsumerRegistry, new ProcessBundleHandler.MetricsEnvironmentStateForBundle(), executionStateTracker, handleStateCallsForBundle, collection, new HashSet());
        create.finish();
        BeamFnApi.ProcessBundleRequest.CacheToken m1650build = BeamFnApi.ProcessBundleRequest.CacheToken.newBuilder().setSideInput(BeamFnApi.ProcessBundleRequest.CacheToken.SideInput.newBuilder().setTransformId("transformId")).m1650build();
        create.setupForProcessBundleRequest(processBundleRequestFor("instructionId", "descriptorId", m1650build));
        Assert.assertEquals("instructionId", create.getInstructionId());
        MatcherAssert.assertThat(create.getCacheTokens(), (Matcher<? super List>) Matchers.containsInAnyOrder(m1650build));
        Cache bundleCache = create.getBundleCache();
        bundleCache.put("A", "B");
        Assert.assertEquals("B", bundleCache.peek("A"));
        Assert.assertTrue(create.getProgressRequestLock().tryLock());
        create.reset();
        Assert.assertNull(create.getInstructionId());
        Assert.assertNull(create.getCacheTokens());
        Assert.assertNull(bundleCache.peek("A"));
        ((BundleSplitListener.InMemory) Mockito.verify(inMemory, Mockito.times(1))).clear();
        ((ExecutionStateSampler.ExecutionStateTracker) Mockito.verify(executionStateTracker, Mockito.times(1))).reset();
        ((Collection) Mockito.verify(collection, Mockito.times(1))).clear();
        ((ThrowingRunnable) Mockito.verify(throwingRunnable, Mockito.times(1))).run();
        Assert.assertNull(MetricsEnvironment.getCurrentContainer());
        create.setupForProcessBundleRequest(processBundleRequestFor("instructionId2", "descriptorId2", new BeamFnApi.ProcessBundleRequest.CacheToken[0]));
        Assert.assertNotSame(bundleCache, create.getBundleCache());
        Assert.assertEquals("instructionId2", create.getInstructionId());
        MatcherAssert.assertThat(create.getCacheTokens(), (Matcher<? super List>) Matchers.is(Matchers.emptyIterable()));
    }

    @Test
    public void testCreatingPTransformExceptionsArePropagated() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).m1456build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, ImmutableMap.of("beam:runner:source:v1", context -> {
            throw new IllegalStateException("TestException");
        }), Caches.noop(), new ProcessBundleHandler.BundleProcessorCache(), (DataSampler) null);
        Assert.assertThrows("TestException", IllegalStateException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build());
        });
    }

    @Test
    public void testBundleFinalizationIsPropagated() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).m1456build());
        FinalizeBundleHandler finalizeBundleHandler = (FinalizeBundleHandler) Mockito.mock(FinalizeBundleHandler.class);
        DoFn.BundleFinalizer.Callback callback = (DoFn.BundleFinalizer.Callback) Mockito.mock(DoFn.BundleFinalizer.Callback.class);
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        Assert.assertTrue(new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, finalizeBundleHandler, new ShortIdMap(), this.executionStateSampler, ImmutableMap.of("beam:runner:source:v1", context -> {
            DoFn.BundleFinalizer bundleFinalizer = context.getBundleFinalizer();
            context.addStartBundleFunction(() -> {
                bundleFinalizer.afterBundleCommit(Instant.ofEpochMilli(42L), callback);
            });
            return null;
        }), Caches.noop(), new ProcessBundleHandler.BundleProcessorCache(), (DataSampler) null).processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("2L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build()).getProcessBundle().getRequiresFinalization());
        ((FinalizeBundleHandler) Mockito.verify(finalizeBundleHandler)).registerCallbacks((String) Mockito.eq("2L"), (Collection) Mockito.argThat(collection -> {
            FinalizeBundleHandler.CallbackRegistration callbackRegistration = (FinalizeBundleHandler.CallbackRegistration) Iterables.getOnlyElement(collection);
            Assert.assertEquals(Instant.ofEpochMilli(42L), callbackRegistration.getExpiryTime());
            Assert.assertSame(callback, callbackRegistration.getCallback());
            return true;
        }));
    }

    @Test
    public void testPTransformStartExceptionsArePropagated() {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).m1456build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, ImmutableMap.of("beam:runner:source:v1", context -> {
            context.addStartBundleFunction(ProcessBundleHandlerTest::throwException);
            return null;
        }), Caches.noop(), new ProcessBundleHandler.BundleProcessorCache(), (DataSampler) null);
        Assert.assertThrows("TestException", IllegalStateException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build());
        });
        MatcherAssert.assertThat((ConcurrentLinkedQueue) processBundleHandler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), (Matcher<? super ConcurrentLinkedQueue>) IsEmptyCollection.empty());
    }

    private ProcessBundleHandler setupProcessBundleHandlerForSimpleRecordingDoFn(List<String> list, List<BeamFnApi.Elements.Timers> list2, boolean z) throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).putOutputs("2L-output", "2L-output-pc").build()).putTransforms("3L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.FunctionSpec.newBuilder().setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN).setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(DoFnWithExecutionInformation.of(new SimpleDoFn(), SimpleDoFn.MAIN_OUTPUT_TAG, Collections.emptyMap(), DoFnSchemaInformation.create())))).build()).putTimerFamilySpecs("tfs-timer_family", RunnerApi.TimerFamilySpec.newBuilder().setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME).setTimerFamilyCoderId("timer-coder").build()).build().toByteString())).putInputs("3L-input", "2L-output-pc").build()).putPcollections("2L-output-pc", RunnerApi.PCollection.newBuilder().setWindowingStrategyId("window-strategy").setCoderId("2L-output-coder").setIsBounded(RunnerApi.IsBounded.Enum.BOUNDED).build()).putWindowingStrategies("window-strategy", RunnerApi.WindowingStrategy.newBuilder().setWindowCoderId("window-strategy-coder").setWindowFn(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:window_fn:global_windows:v1")).setOutputTime(RunnerApi.OutputTime.Enum.END_OF_WINDOW).setAccumulationMode(RunnerApi.AccumulationMode.Enum.ACCUMULATING).setTrigger(RunnerApi.Trigger.newBuilder().setAlways(RunnerApi.Trigger.Always.getDefaultInstance())).setClosingBehavior(RunnerApi.ClosingBehavior.Enum.EMIT_ALWAYS).setOnTimeBehavior(RunnerApi.OnTimeBehavior.Enum.FIRE_ALWAYS).build()).setTimerApiServiceDescriptor(Endpoints.ApiServiceDescriptor.newBuilder().setUrl("url").build()).putCoders("string_coder", CoderTranslation.toProto(StringUtf8Coder.of()).getCoder()).putCoders("2L-output-coder", RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN).build()).addComponentCoderIds("string_coder").addComponentCoderIds("string_coder").build()).putCoders("window-strategy-coder", RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN).build()).build()).putCoders("timer-coder", RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.TIMER_CODER_URN)).addComponentCoderIds("string_coder").addComponentCoderIds("window-strategy-coder").build()).m1456build());
        HashMap newHashMap = Maps.newHashMap(ProcessBundleHandler.REGISTERED_RUNNER_FACTORIES);
        newHashMap.put("beam:runner:source:v1", context -> {
            context.addIncomingDataEndpoint(Endpoints.ApiServiceDescriptor.getDefaultInstance(), KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()), kv -> {
                list.add((String) kv.getValue());
            });
            return null;
        });
        ((BeamFnDataClient) Mockito.doAnswer(invocationOnMock -> {
            return new BeamFnDataOutboundAggregator(PipelineOptionsFactory.create(), (Supplier) invocationOnMock.getArgument(1), new StreamObserver<BeamFnApi.Elements>() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.1
                public void onNext(BeamFnApi.Elements elements) {
                    for (BeamFnApi.Elements.Timers timers : elements.getTimersList()) {
                        list2.addAll(elements.getTimersList());
                    }
                }

                public void onError(Throwable th) {
                }

                public void onCompleted() {
                }
            }, ((Boolean) invocationOnMock.getArgument(2)).booleanValue());
        }).when(this.beamFnDataClient)).createOutboundAggregator((Endpoints.ApiServiceDescriptor) ArgumentMatchers.any(), (Supplier) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set singleton = z ? Collections.singleton(BeamUrns.getUrn(RunnerApi.StandardRunnerProtocols.Enum.CONTROL_RESPONSE_ELEMENTS_EMBEDDING)) : Collections.emptySet();
        Objects.requireNonNull(of);
        return new ProcessBundleHandler(create, singleton, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, newHashMap, Caches.noop(), new ProcessBundleHandler.BundleProcessorCache(), (DataSampler) null);
    }

    @Test
    public void testInstructionEmbeddedElementsAreProcessed() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ProcessBundleHandler processBundleHandler = setupProcessBundleHandlerForSimpleRecordingDoFn(arrayList, arrayList2, false);
        ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
        KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), byteStringOutputStream);
        OutputStream byteStringOutputStream2 = new ByteStringOutputStream();
        Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE).encode(Timer.of("", "timer_id", Collections.singletonList(GlobalWindow.INSTANCE), Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L), PaneInfo.ON_TIME_AND_ONLY_FIRING), byteStringOutputStream2);
        processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("998L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L").setElements(BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId("998L").setTransformId("2L").setData(byteStringOutputStream.toByteString()).m744build()).addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId("998L").setTransformId("2L").setIsLast(true).m744build()).addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId("998L").setTransformId("3L").setTimerFamilyId("tfs-timer_family").setTimers(byteStringOutputStream2.toByteString()).m791build()).addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId("998L").setTransformId("3L").setTimerFamilyId("tfs-timer_family").setIsLast(true).m791build()).m697build())).m1074build());
        processBundleHandler.shutdown();
        MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains("data"));
        Assert.assertEquals("output_timer", Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE).m3635decode(arrayList2.get(0).getTimers().newInput()).getDynamicTimerTag());
    }

    @Test
    public void testInstructionEmbeddedElementsWithMalformedData() throws Exception {
        ProcessBundleHandler processBundleHandler = setupProcessBundleHandlerForSimpleRecordingDoFn(new ArrayList(), new ArrayList(), false);
        ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
        KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), byteStringOutputStream);
        Assert.assertThrows("Expect java.lang.IllegalStateException: Unable to find inbound data receiver for instruction 998L and transform 3L.", IllegalStateException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("998L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L").setElements(BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId("998L").setTransformId("3L").setData(byteStringOutputStream.toByteString()).m744build()).m697build())).m1074build());
        });
        Assert.assertThrows("Elements embedded in ProcessBundleRequest do not contain stream terminators for all data and timer inputs. Unterminated endpoints: [2L:data, 3L:timers:tfs-timer_family]", RuntimeException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("998L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L").setElements(BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId("998L").setTransformId("2L").setData(byteStringOutputStream.toByteString()).m744build()).m697build())).m1074build());
        });
        processBundleHandler.shutdown();
    }

    @Test
    public void testInstructionEmbeddedElementsWithMalformedTimers() throws Exception {
        ProcessBundleHandler processBundleHandler = setupProcessBundleHandlerForSimpleRecordingDoFn(new ArrayList(), new ArrayList(), false);
        OutputStream byteStringOutputStream = new ByteStringOutputStream();
        Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE).encode(Timer.of("", "timer_id", Collections.singletonList(GlobalWindow.INSTANCE), Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L), PaneInfo.ON_TIME_AND_ONLY_FIRING), byteStringOutputStream);
        Assert.assertThrows("Expect java.lang.IllegalStateException: Unable to find inbound timer receiver for instruction 998L, transform 4L, and timer family tfs-timer_family.", IllegalStateException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("998L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L").setElements(BeamFnApi.Elements.newBuilder().addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId("998L").setTransformId("4L").setTimerFamilyId("tfs-timer_family").setTimers(byteStringOutputStream.toByteString()).m791build()).m697build())).m1074build());
        });
        Assert.assertThrows("Expect java.lang.IllegalStateException: Unable to find inbound timer receiver for instruction 998L, transform 3L, and timer family tfs-not_declared_id.", IllegalStateException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("998L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L").setElements(BeamFnApi.Elements.newBuilder().addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId("998L").setTransformId("3L").setTimerFamilyId("tfs-not_declared_id").setTimers(byteStringOutputStream.toByteString()).m791build()).m697build())).m1074build());
        });
        Assert.assertThrows("Elements embedded in ProcessBundleRequest do not contain stream terminators for all data and timer inputs. Unterminated endpoints: [2L:data, 3L:timers:tfs-timer_family]", RuntimeException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("998L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L").setElements(BeamFnApi.Elements.newBuilder().addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId("998L").setTransformId("3L").setTimerFamilyId("tfs-timer_family").setTimers(byteStringOutputStream.toByteString()).m791build()).m697build())).m1074build());
        });
        processBundleHandler.shutdown();
    }

    @Test
    public void testOutputEmbeddedElementsAreProcessed() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ProcessBundleHandler processBundleHandler = setupProcessBundleHandlerForSimpleRecordingDoFn(arrayList, arrayList2, true);
        OutputStream byteStringOutputStream = new ByteStringOutputStream();
        Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE).encode(Timer.of("", "timer_id", Collections.singletonList(GlobalWindow.INSTANCE), Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L), PaneInfo.ON_TIME_AND_ONLY_FIRING), byteStringOutputStream);
        BeamFnApi.InstructionResponse.Builder processBundle = processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("998L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L").setElements(BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId("998L").setTransformId("2L").setIsLast(true).m744build()).addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId("998L").setTransformId("3L").setTimerFamilyId("tfs-timer_family").setTimers(byteStringOutputStream.toByteString()).m791build()).addTimers(BeamFnApi.Elements.Timers.newBuilder().setInstructionId("998L").setTransformId("3L").setTimerFamilyId("tfs-timer_family").setIsLast(true).m791build()).m697build())).m1074build());
        processBundleHandler.shutdown();
        MatcherAssert.assertThat(arrayList2, (Matcher<? super ArrayList>) IsEmptyCollection.empty());
        Assert.assertEquals(2L, processBundle.m1122build().getProcessBundle().getElements().getTimersCount());
    }

    @Test
    public void testInstructionIsUnregisteredFromBeamFnDataClientOnSuccess() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).m1456build());
        ((BeamFnDataClient) Mockito.doAnswer(invocationOnMock -> {
            ((CloseableFnDataReceiver) invocationOnMock.getArgument(2, CloseableFnDataReceiver.class)).accept(BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId((String) invocationOnMock.getArgument(0, String.class)).setTransformId("2L").setIsLast(true)).m697build());
            return null;
        }).when(this.beamFnDataClient)).registerReceiver((String) ArgumentMatchers.any(), (List) ArgumentMatchers.any(), (CloseableFnDataReceiver) ArgumentMatchers.any());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, ImmutableMap.of("beam:runner:source:v1", context -> {
            context.addIncomingDataEndpoint(Endpoints.ApiServiceDescriptor.getDefaultInstance(), StringUtf8Coder.of(), str -> {
            });
            return null;
        }), Caches.noop(), new ProcessBundleHandler.BundleProcessorCache(), (DataSampler) null).processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("instructionId").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build());
        ((BeamFnDataClient) Mockito.verify(this.beamFnDataClient)).registerReceiver((String) Mockito.eq("instructionId"), (List) ArgumentMatchers.any(), (CloseableFnDataReceiver) ArgumentMatchers.any());
        ((BeamFnDataClient) Mockito.verify(this.beamFnDataClient)).unregisterReceiver((String) Mockito.eq("instructionId"), (List) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(this.beamFnDataClient);
    }

    @Test
    public void testDataProcessingExceptionsArePropagated() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).m1456build());
        ((BeamFnDataClient) Mockito.doAnswer(invocationOnMock -> {
            ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
            StringUtf8Coder.of().encode("A", byteStringOutputStream);
            ((CloseableFnDataReceiver) invocationOnMock.getArgument(2, CloseableFnDataReceiver.class)).accept(BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId((String) invocationOnMock.getArgument(0, String.class)).setTransformId("2L").setData(byteStringOutputStream.toByteString()).setIsLast(true)).m697build());
            return null;
        }).when(this.beamFnDataClient)).registerReceiver((String) ArgumentMatchers.any(), (List) ArgumentMatchers.any(), (CloseableFnDataReceiver) ArgumentMatchers.any());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, ImmutableMap.of("beam:runner:source:v1", context -> {
            context.addIncomingDataEndpoint(Endpoints.ApiServiceDescriptor.getDefaultInstance(), StringUtf8Coder.of(), str -> {
                throw new IllegalStateException("TestException");
            });
            return null;
        }), Caches.noop(), new ProcessBundleHandler.BundleProcessorCache(), (DataSampler) null);
        Assert.assertThrows("TestException", IllegalStateException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("instructionId").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build());
        });
        ((BeamFnDataClient) Mockito.verify(this.beamFnDataClient)).registerReceiver((String) Mockito.eq("instructionId"), (List) ArgumentMatchers.any(), (CloseableFnDataReceiver) ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions(this.beamFnDataClient);
    }

    @Test
    public void testPTransformFinishExceptionsArePropagated() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).m1456build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, ImmutableMap.of("beam:runner:source:v1", context -> {
            context.addFinishBundleFunction(ProcessBundleHandlerTest::throwException);
            return null;
        }), Caches.noop(), new ProcessBundleHandler.BundleProcessorCache(), (DataSampler) null);
        Assert.assertThrows("TestException", IllegalStateException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build());
        });
        MatcherAssert.assertThat((ConcurrentLinkedQueue) processBundleHandler.bundleProcessorCache.getCachedBundleProcessors().get("1L"), (Matcher<? super ConcurrentLinkedQueue>) IsEmptyCollection.empty());
    }

    @Test
    public void testPendingStateCallsBlockTillCompletion() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).setStateApiServiceDescriptor(Endpoints.ApiServiceDescriptor.getDefaultInstance()).m1456build());
        final CompletableFuture[] completableFutureArr = new CompletableFuture[1];
        final CompletableFuture[] completableFutureArr2 = new CompletableFuture[1];
        BeamFnStateGrpcClientCache beamFnStateGrpcClientCache = (BeamFnStateGrpcClientCache) Mockito.mock(BeamFnStateGrpcClientCache.class);
        BeamFnStateClient beamFnStateClient = (BeamFnStateClient) Mockito.mock(BeamFnStateClient.class);
        Mockito.when(beamFnStateGrpcClientCache.forApiServiceDescriptor((Endpoints.ApiServiceDescriptor) ArgumentMatchers.any())).thenReturn(beamFnStateClient);
        ((BeamFnStateClient) Mockito.doAnswer(invocationOnMock -> {
            BeamFnApi.StateRequest.Builder builder = (BeamFnApi.StateRequest.Builder) invocationOnMock.getArguments()[0];
            CompletableFuture completableFuture = new CompletableFuture();
            new Thread(() -> {
                Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
                String instructionId = builder.getInstructionId();
                boolean z = -1;
                switch (instructionId.hashCode()) {
                    case -1149187101:
                        if (instructionId.equals("SUCCESS")) {
                            z = false;
                            break;
                        }
                        break;
                    case 2150174:
                        if (instructionId.equals("FAIL")) {
                            z = true;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        completableFuture.complete(BeamFnApi.StateResponse.getDefaultInstance());
                        return;
                    case true:
                        completableFuture.completeExceptionally(new RuntimeException("TEST ERROR"));
                        return;
                    default:
                        return;
                }
            }).start();
            return completableFuture;
        }).when(beamFnStateClient)).handle((BeamFnApi.StateRequest.Builder) ArgumentMatchers.any());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, beamFnStateGrpcClientCache, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, ImmutableMap.of("beam:runner:source:v1", new PTransformRunnerFactory<Object>() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.2
            public Object createRunnerForPTransform(PTransformRunnerFactory.Context context) throws IOException {
                BeamFnStateClient beamFnStateClient2 = context.getBeamFnStateClient();
                context.addStartBundleFunction(() -> {
                    doStateCalls(beamFnStateClient2);
                });
                return null;
            }

            private void doStateCalls(BeamFnStateClient beamFnStateClient2) {
                completableFutureArr[0] = beamFnStateClient2.handle(BeamFnApi.StateRequest.newBuilder().setInstructionId("SUCCESS"));
                completableFutureArr2[0] = beamFnStateClient2.handle(BeamFnApi.StateRequest.newBuilder().setInstructionId("FAIL"));
            }
        }), Caches.noop(), new ProcessBundleHandler.BundleProcessorCache(), (DataSampler) null).processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build());
        Assert.assertTrue(completableFutureArr[0].isDone());
        Assert.assertTrue(completableFutureArr2[0].isDone());
    }

    @Test
    public void testStateCallsFailIfNoStateApiServiceDescriptorSpecified() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).m1456build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, ImmutableMap.of("beam:runner:source:v1", new PTransformRunnerFactory<Object>() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.3
            public Object createRunnerForPTransform(PTransformRunnerFactory.Context context) throws IOException {
                BeamFnStateClient beamFnStateClient = context.getBeamFnStateClient();
                context.addStartBundleFunction(() -> {
                    doStateCalls(beamFnStateClient);
                });
                return null;
            }

            private void doStateCalls(BeamFnStateClient beamFnStateClient) {
                beamFnStateClient.handle(BeamFnApi.StateRequest.newBuilder().setInstructionId("SUCCESS"));
            }
        }), Caches.noop(), new ProcessBundleHandler.BundleProcessorCache(), (DataSampler) null);
        Assert.assertThrows("State API calls are unsupported", IllegalStateException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build());
        });
    }

    @Test
    public void testProgressReportingIsExecutedSerially() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).putOutputs("2L-output", "2L-output-pc").build()).putPcollections("2L-output-pc", RunnerApi.PCollection.getDefaultInstance()).m1456build());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicReference atomicReference2 = new AtomicReference();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        BundleProgressReporter bundleProgressReporter = new BundleProgressReporter() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.4
            public void updateIntermediateMonitoringData(Map<String, ByteString> map) {
                Assert.assertTrue(((ReentrantLock) ((ProcessBundleHandler.BundleProcessor) atomicReference.get()).getProgressRequestLock()).isHeldByCurrentThread());
                Assert.assertNotEquals(Thread.currentThread(), atomicReference2.get());
                Assert.assertFalse(atomicBoolean.get());
                Assert.assertFalse(atomicBoolean2.get());
                map.put("testId", ByteString.copyFromUtf8(Long.toString(atomicInteger.getAndIncrement())));
            }

            public void updateFinalMonitoringData(Map<String, ByteString> map) {
                Assert.assertTrue(((ReentrantLock) ((ProcessBundleHandler.BundleProcessor) atomicReference.get()).getProgressRequestLock()).isHeldByCurrentThread());
                Assert.assertEquals(Thread.currentThread(), atomicReference2.get());
                Assert.assertFalse(atomicBoolean.getAndSet(true));
                Assert.assertFalse(atomicBoolean2.get());
                map.put("testId", ByteString.copyFromUtf8(Long.toString(atomicInteger.get())));
            }

            public void reset() {
                Assert.assertTrue(((ReentrantLock) ((ProcessBundleHandler.BundleProcessor) atomicReference.get()).getProgressRequestLock()).isHeldByCurrentThread());
                Assert.assertEquals(Thread.currentThread(), atomicReference2.get());
                Assert.assertTrue(atomicBoolean.get());
                Assert.assertFalse(atomicBoolean2.getAndSet(true));
            }
        };
        PTransformRunnerFactory pTransformRunnerFactory = context -> {
            context.getPTransformId();
            context.getProcessBundleInstructionIdSupplier();
            context.addBundleProgressReporter(bundleProgressReporter);
            context.addStartBundleFunction(() -> {
                countDownLatch.countDown();
            });
            context.addFinishBundleFunction(() -> {
                countDownLatch2.await();
            });
            return null;
        };
        ProcessBundleHandler.BundleProcessorCache bundleProcessorCache = new ProcessBundleHandler.BundleProcessorCache();
        PipelineOptions create = PipelineOptionsFactory.create();
        Set singleton = Collections.singleton(BeamUrns.getUrn(RunnerApi.StandardRunnerProtocols.Enum.MONITORING_INFO_SHORT_IDS));
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, singleton, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, ImmutableMap.of("beam:runner:source:v1", pTransformRunnerFactory), Caches.noop(), bundleProcessorCache, (DataSampler) null);
        AtomicBoolean atomicBoolean3 = new AtomicBoolean();
        Future submit = this.executor.submit(() -> {
            atomicReference2.set(Thread.currentThread());
            BeamFnApi.InstructionResponse m1122build = processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build()).m1122build();
            atomicBoolean3.set(true);
            return m1122build;
        });
        countDownLatch.await();
        atomicReference.set(bundleProcessorCache.find("999L"));
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        CountDownLatch countDownLatch4 = new CountDownLatch(5);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 20; i++) {
            int i2 = i;
            arrayList.add(this.executor.submit(() -> {
                BeamFnApi.InstructionResponse.Builder progress;
                countDownLatch3.await();
                int i3 = 0;
                do {
                    try {
                        i3++;
                        progress = processBundleHandler.progress(BeamFnApi.InstructionRequest.newBuilder().setInstructionId("thread-" + i2 + ProcessIdUtil.DEFAULT_PROCESSID + i3).setProcessBundleProgress(BeamFnApi.ProcessBundleProgressRequest.newBuilder().setInstructionId("999L").m1508build()).m1074build());
                        if (progress.getProcessBundleProgress().getMonitoringDataMap().containsKey("testId")) {
                            break;
                        }
                    } finally {
                        countDownLatch4.countDown();
                    }
                } while (!atomicBoolean3.get());
                return progress.m1122build();
            }));
        }
        countDownLatch3.countDown();
        countDownLatch4.await();
        countDownLatch2.countDown();
        ArrayList arrayList2 = new ArrayList();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ByteString monitoringDataOrDefault = ((BeamFnApi.InstructionResponse) ((Future) it.next()).get()).getProcessBundleProgress().getMonitoringDataOrDefault("testId", null);
            if (monitoringDataOrDefault != null) {
                arrayList2.add(monitoringDataOrDefault);
            }
        }
        Assert.assertTrue(arrayList2.size() >= 5);
        ArrayList arrayList3 = new ArrayList();
        for (int i3 = 0; i3 < atomicInteger.get(); i3++) {
            arrayList3.add(ByteString.copyFromUtf8(Long.toString(i3)));
        }
        MatcherAssert.assertThat(arrayList2, (Matcher<? super ArrayList>) Matchers.containsInAnyOrder(arrayList3.toArray()));
        Assert.assertEquals(ByteString.copyFromUtf8(Long.toString(atomicInteger.get())), ((BeamFnApi.InstructionResponse) submit.get()).getProcessBundle().getMonitoringDataOrThrow("testId"));
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(atomicBoolean2.get());
    }

    @Test
    public void testTimerRegistrationsFailIfNoTimerApiServiceDescriptorSpecified() throws Exception {
        ImmutableMap of = ImmutableMap.of("1L", BeamFnApi.ProcessBundleDescriptor.newBuilder().putTransforms("2L", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:runner:source:v1").build()).build()).m1456build());
        PipelineOptions create = PipelineOptionsFactory.create();
        Set emptySet = Collections.emptySet();
        Objects.requireNonNull(of);
        ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(create, emptySet, (v1) -> {
            return r4.get(v1);
        }, this.beamFnDataClient, (BeamFnStateGrpcClientCache) null, (FinalizeBundleHandler) null, new ShortIdMap(), this.executionStateSampler, ImmutableMap.of("beam:runner:source:v1", new PTransformRunnerFactory<Object>() { // from class: org.apache.beam.fn.harness.control.ProcessBundleHandlerTest.5
            public Object createRunnerForPTransform(PTransformRunnerFactory.Context context) throws IOException {
                context.addOutgoingTimersEndpoint("timer", Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE));
                return null;
            }
        }), Caches.noop(), new ProcessBundleHandler.BundleProcessorCache(), (DataSampler) null);
        Assert.assertThrows("Timers are unsupported", IllegalStateException.class, () -> {
            processBundleHandler.processBundle(BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L")).m1074build());
        });
    }

    private static void throwException() {
        throw new IllegalStateException("TestException");
    }
}
