package org.apache.beam.fn.harness;

import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.function.Consumer;
import org.apache.beam.fn.harness.FnApiDoFnRunner;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.state.FakeBeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.base.Supplier;
import org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.base.Suppliers;
import org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.collect.ImmutableMap;
import org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.collect.Iterables;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.ByteString;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsMapContaining;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest.class */
public class FnApiDoFnRunnerTest implements Serializable {

    @Rule
    public transient ResetDateTimeProvider dateTimeProvider = new ResetDateTimeProvider();
    public static final String TEST_PTRANSFORM_ID = "pTransformId";

    /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$ConcatCombineFn.class */
    private static class ConcatCombineFn extends Combine.CombineFn<String, String, String> {
        private ConcatCombineFn() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.sdk.transforms.Combine.CombineFn
        public String createAccumulator() {
            return "";
        }

        @Override // org.apache.beam.sdk.transforms.Combine.CombineFn
        public String addInput(String str, String str2) {
            return str.concat(str2);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.sdk.transforms.Combine.CombineFn
        public String mergeAccumulators(Iterable<String> iterable) {
            StringBuilder sb = new StringBuilder();
            Iterator<String> it = iterable.iterator();
            while (it.hasNext()) {
                sb.append(it.next());
            }
            return sb.toString();
        }

        @Override // org.apache.beam.sdk.transforms.Combine.CombineFn
        public String extractOutput(String str) {
            return str;
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$TestSideInputDoFn.class */
    private static class TestSideInputDoFn extends DoFn<String, String> {
        private final PCollectionView<String> defaultSingletonSideInput;
        private final PCollectionView<String> singletonSideInput;
        private final PCollectionView<Iterable<String>> iterableSideInput;
        private final TupleTag<String> additionalOutput;

        private TestSideInputDoFn(PCollectionView<String> pCollectionView, PCollectionView<String> pCollectionView2, PCollectionView<Iterable<String>> pCollectionView3, TupleTag<String> tupleTag) {
            this.defaultSingletonSideInput = pCollectionView;
            this.singletonSideInput = pCollectionView2;
            this.iterableSideInput = pCollectionView3;
            this.additionalOutput = tupleTag;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, String>.ProcessContext processContext) {
            processContext.output(processContext.element() + ":" + ((String) processContext.sideInput(this.defaultSingletonSideInput)));
            processContext.output(processContext.element() + ":" + ((String) processContext.sideInput(this.singletonSideInput)));
            Iterator it = ((Iterable) processContext.sideInput(this.iterableSideInput)).iterator();
            while (it.hasNext()) {
                processContext.output(processContext.element() + ":" + ((String) it.next()));
            }
            processContext.output(this.additionalOutput, processContext.element() + ":additional");
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$TestSideInputIsAccessibleForDownstreamCallersDoFn.class */
    private static class TestSideInputIsAccessibleForDownstreamCallersDoFn extends DoFn<String, Iterable<String>> {
        private final PCollectionView<Iterable<String>> iterableSideInput;

        private TestSideInputIsAccessibleForDownstreamCallersDoFn(PCollectionView<Iterable<String>> pCollectionView) {
            this.iterableSideInput = pCollectionView;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, Iterable<String>>.ProcessContext processContext) {
            processContext.output((Iterable) processContext.sideInput(this.iterableSideInput));
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$TestStatefulDoFn.class */
    private static class TestStatefulDoFn extends DoFn<KV<String, String>, String> {
        private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput");
        private static final TupleTag<String> additionalOutput = new TupleTag<>("output");

        @DoFn.StateId("value")
        private final StateSpec<ValueState<String>> valueStateSpec;

        @DoFn.StateId("bag")
        private final StateSpec<BagState<String>> bagStateSpec;

        @DoFn.StateId("combine")
        private final StateSpec<CombiningState<String, String, String>> combiningStateSpec;

        private TestStatefulDoFn() {
            this.valueStateSpec = StateSpecs.value(StringUtf8Coder.of());
            this.bagStateSpec = StateSpecs.bag(StringUtf8Coder.of());
            this.combiningStateSpec = StateSpecs.combining(StringUtf8Coder.of(), new ConcatCombineFn());
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, String>, String>.ProcessContext processContext, @DoFn.StateId("value") ValueState<String> valueState, @DoFn.StateId("bag") BagState<String> bagState, @DoFn.StateId("combine") CombiningState<String, String, String> combiningState) {
            processContext.output("value:" + valueState.read());
            valueState.write(processContext.element().getValue());
            processContext.output("bag:" + Iterables.toString(bagState.read()));
            bagState.add(processContext.element().getValue());
            processContext.output("combine:" + combiningState.read());
            combiningState.add(processContext.element().getValue());
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$TestTimerfulDoFn.class */
    private static class TestTimerfulDoFn extends DoFn<KV<String, String>, String> {

        @DoFn.TimerId("event")
        private final TimerSpec eventTimerSpec;

        @DoFn.TimerId("processing")
        private final TimerSpec processingTimerSpec;

        private TestTimerfulDoFn() {
            this.eventTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
            this.processingTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, String>, String>.ProcessContext processContext, @DoFn.TimerId("event") Timer timer, @DoFn.TimerId("processing") Timer timer2) {
            processContext.output("main" + processContext.element().getKey());
            timer.set(processContext.timestamp().plus(1L));
            timer2.offset(Duration.millis(2L));
            timer2.setRelative();
        }

        @DoFn.OnTimer("event")
        public void eventTimer(DoFn<KV<String, String>, String>.OnTimerContext onTimerContext, @DoFn.TimerId("event") Timer timer, @DoFn.TimerId("processing") Timer timer2) {
            onTimerContext.output("event");
            timer.set(onTimerContext.timestamp().plus(11L));
            timer2.offset(Duration.millis(12L));
            timer2.setRelative();
        }

        @DoFn.OnTimer("processing")
        public void processingTimer(DoFn<KV<String, String>, String>.OnTimerContext onTimerContext, @DoFn.TimerId("event") Timer timer, @DoFn.TimerId("processing") Timer timer2) {
            onTimerContext.output("processing");
            timer.set(onTimerContext.timestamp().plus(21L));
            timer2.offset(Duration.millis(22L));
            timer2.setRelative();
        }
    }

    @Test
    public void testUsingUserState() throws Exception {
        Pipeline create = Pipeline.create();
        PCollection<?> pCollection = (PCollection) create.apply(Create.of(KV.of("unused", "unused"), new KV[0]));
        PCollection<?> pCollection2 = (PCollection) pCollection.apply(TEST_PTRANSFORM_ID, ParDo.of(new TestStatefulDoFn()));
        SdkComponents create2 = SdkComponents.create(create.getOptions());
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(create, create2);
        String registerPCollection = create2.registerPCollection(pCollection);
        String registerPCollection2 = create2.registerPCollection(pCollection2);
        RunnerApi.PTransform transformsOrThrow = proto.getComponents().getTransformsOrThrow(proto.getComponents().getTransformsOrThrow(TEST_PTRANSFORM_ID).getSubtransforms(0));
        FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of(bagUserStateKey("value", "X"), encode("X0"), bagUserStateKey("bag", "X"), encode("X0"), bagUserStateKey("combine", "X"), encode("X0")));
        ArrayList arrayList = new ArrayList();
        ArrayListMultimap create3 = ArrayListMultimap.create();
        Objects.requireNonNull(arrayList);
        create3.put(registerPCollection2, (v1) -> {
            r2.add(v1);
        });
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
        PipelineOptions create4 = PipelineOptionsFactory.create();
        Supplier ofInstance = Suppliers.ofInstance("57L");
        Objects.requireNonNull(ofInstance);
        java.util.function.Supplier supplier = ofInstance::get;
        Map<String, RunnerApi.PCollection> pcollectionsMap = proto.getComponents().getPcollectionsMap();
        Map<String, RunnerApi.Coder> codersMap = proto.getComponents().getCodersMap();
        Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = proto.getComponents().getWindowingStrategiesMap();
        Objects.requireNonNull(arrayList2);
        Consumer consumer = (v1) -> {
            r11.add(v1);
        };
        Objects.requireNonNull(arrayList3);
        factory.createRunnerForPTransform(create4, (BeamFnDataClient) null, fakeBeamFnStateClient, TEST_PTRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, create3, consumer, (v1) -> {
            r12.add(v1);
        }, (BundleSplitListener) null);
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
        arrayList.clear();
        Assert.assertThat(create3.keySet(), Matchers.containsInAnyOrder(registerPCollection, registerPCollection2));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Iterables.getOnlyElement(create3.get((ArrayListMultimap) registerPCollection));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("X", "X1")));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("Y", "Y1")));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("X", "X2")));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow(KV.of("Y", "Y2")));
        Assert.assertThat(arrayList, Matchers.contains(WindowedValue.valueInGlobalWindow("value:X0"), WindowedValue.valueInGlobalWindow("bag:[X0]"), WindowedValue.valueInGlobalWindow("combine:X0"), WindowedValue.valueInGlobalWindow("value:null"), WindowedValue.valueInGlobalWindow("bag:[]"), WindowedValue.valueInGlobalWindow("combine:"), WindowedValue.valueInGlobalWindow("value:X1"), WindowedValue.valueInGlobalWindow("bag:[X0, X1]"), WindowedValue.valueInGlobalWindow("combine:X0X1"), WindowedValue.valueInGlobalWindow("value:Y1"), WindowedValue.valueInGlobalWindow("bag:[Y1]"), WindowedValue.valueInGlobalWindow("combine:Y1")));
        arrayList.clear();
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList3)).run();
        Assert.assertThat(arrayList, Matchers.empty());
        Assert.assertEquals(ImmutableMap.builder().put(bagUserStateKey("value", "X"), encode("X2")).put(bagUserStateKey("bag", "X"), encode("X0", "X1", "X2")).put(bagUserStateKey("combine", "X"), encode("X0X1X2")).put(bagUserStateKey("value", "Y"), encode("Y2")).put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2")).put(bagUserStateKey("combine", "Y"), encode("Y1Y2")).build(), fakeBeamFnStateClient.getData());
        arrayList.clear();
    }

    private BeamFnApi.StateKey bagUserStateKey(String str, String str2) throws IOException {
        return BeamFnApi.StateKey.newBuilder().setBagUserState(BeamFnApi.StateKey.BagUserState.newBuilder().setPtransformId(TEST_PTRANSFORM_ID).setUserStateId(str).setKey(encode(str2)).setWindow(ByteString.copyFrom(CoderUtils.encodeToByteArray(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE)))).build();
    }

    @Test
    public void testBasicWithSideInputsAndOutputs() throws Exception {
        Pipeline create = Pipeline.create();
        PCollection<?> pCollection = (PCollection) create.apply(Create.of("unused", new String[0]));
        PCollectionView<?> pCollectionView = (PCollectionView) pCollection.apply(View.asSingleton().withDefaultValue("defaultSingletonValue"));
        PCollectionView<?> pCollectionView2 = (PCollectionView) pCollection.apply(View.asSingleton());
        PCollectionView<?> pCollectionView3 = (PCollectionView) pCollection.apply(View.asIterable());
        TupleTag<String> tupleTag = new TupleTag<String>("main") { // from class: org.apache.beam.fn.harness.FnApiDoFnRunnerTest.1
        };
        TupleTag<String> tupleTag2 = new TupleTag<String>("additional") { // from class: org.apache.beam.fn.harness.FnApiDoFnRunnerTest.2
        };
        PCollectionTuple pCollectionTuple = (PCollectionTuple) pCollection.apply(TEST_PTRANSFORM_ID, ParDo.of(new TestSideInputDoFn(pCollectionView, pCollectionView2, pCollectionView3, tupleTag2)).withSideInputs(pCollectionView, pCollectionView2, pCollectionView3).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        SdkComponents create2 = SdkComponents.create(create.getOptions());
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(create, create2);
        String registerPCollection = create2.registerPCollection(pCollection);
        String registerPCollection2 = create2.registerPCollection(pCollectionTuple.get(tupleTag));
        String registerPCollection3 = create2.registerPCollection(pCollectionTuple.get(tupleTag2));
        RunnerApi.PTransform transformsOrThrow = proto.getComponents().getTransformsOrThrow(TEST_PTRANSFORM_ID);
        ImmutableMap of = ImmutableMap.of(multimapSideInputKey(pCollectionView2.getTagInternal().getId(), ByteString.EMPTY), encode("singletonValue"), multimapSideInputKey(pCollectionView3.getTagInternal().getId(), ByteString.EMPTY), encode("iterableValue1", "iterableValue2", "iterableValue3"));
        FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(of);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayListMultimap create3 = ArrayListMultimap.create();
        Objects.requireNonNull(arrayList);
        create3.put(registerPCollection2, (v1) -> {
            r2.add(v1);
        });
        Objects.requireNonNull(arrayList2);
        create3.put(registerPCollection3, (v1) -> {
            r2.add(v1);
        });
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
        PipelineOptions create4 = PipelineOptionsFactory.create();
        Supplier ofInstance = Suppliers.ofInstance("57L");
        Objects.requireNonNull(ofInstance);
        java.util.function.Supplier supplier = ofInstance::get;
        Map<String, RunnerApi.PCollection> pcollectionsMap = proto.getComponents().getPcollectionsMap();
        Map<String, RunnerApi.Coder> codersMap = proto.getComponents().getCodersMap();
        Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = proto.getComponents().getWindowingStrategiesMap();
        Objects.requireNonNull(arrayList3);
        Consumer consumer = (v1) -> {
            r11.add(v1);
        };
        Objects.requireNonNull(arrayList4);
        factory.createRunnerForPTransform(create4, (BeamFnDataClient) null, fakeBeamFnStateClient, TEST_PTRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, create3, consumer, (v1) -> {
            r12.add(v1);
        }, (BundleSplitListener) null);
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList3)).run();
        arrayList.clear();
        Assert.assertThat(create3.keySet(), Matchers.containsInAnyOrder(registerPCollection, registerPCollection2, registerPCollection3));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Iterables.getOnlyElement(create3.get((ArrayListMultimap) registerPCollection));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow("X"));
        fnDataReceiver.accept(WindowedValue.valueInGlobalWindow("Y"));
        Assert.assertThat(arrayList, Matchers.contains(WindowedValue.valueInGlobalWindow("X:defaultSingletonValue"), WindowedValue.valueInGlobalWindow("X:singletonValue"), WindowedValue.valueInGlobalWindow("X:iterableValue1"), WindowedValue.valueInGlobalWindow("X:iterableValue2"), WindowedValue.valueInGlobalWindow("X:iterableValue3"), WindowedValue.valueInGlobalWindow("Y:defaultSingletonValue"), WindowedValue.valueInGlobalWindow("Y:singletonValue"), WindowedValue.valueInGlobalWindow("Y:iterableValue1"), WindowedValue.valueInGlobalWindow("Y:iterableValue2"), WindowedValue.valueInGlobalWindow("Y:iterableValue3")));
        Assert.assertThat(arrayList2, Matchers.contains(WindowedValue.valueInGlobalWindow("X:additional"), WindowedValue.valueInGlobalWindow("Y:additional")));
        arrayList.clear();
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList4)).run();
        Assert.assertThat(arrayList, Matchers.empty());
        Assert.assertEquals(of, fakeBeamFnStateClient.getData());
        arrayList.clear();
    }

    @Test
    public void testSideInputIsAccessibleForDownstreamCallers() throws Exception {
        FixedWindows of = FixedWindows.of(Duration.millis(1L));
        IntervalWindow assignWindow = of.assignWindow(new Instant(1L));
        IntervalWindow assignWindow2 = of.assignWindow(new Instant(2L));
        ByteString copyFrom = ByteString.copyFrom(CoderUtils.encodeToByteArray(of.windowCoder(), assignWindow));
        ByteString copyFrom2 = ByteString.copyFrom(CoderUtils.encodeToByteArray(of.windowCoder(), assignWindow2));
        Pipeline create = Pipeline.create();
        PCollection<?> pCollection = (PCollection) ((PCollection) create.apply(Create.of("unused", new String[0]))).apply(Window.into(of));
        PCollectionView<?> pCollectionView = (PCollectionView) pCollection.apply(View.asIterable());
        PCollection<?> pCollection2 = (PCollection) pCollection.apply(TEST_PTRANSFORM_ID, ParDo.of(new TestSideInputIsAccessibleForDownstreamCallersDoFn(pCollectionView)).withSideInputs(pCollectionView));
        SdkComponents create2 = SdkComponents.create(create.getOptions());
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(create, create2);
        String registerPCollection = create2.registerPCollection(pCollection);
        String registerPCollection2 = create2.registerPCollection(pCollection2);
        RunnerApi.PTransform transformsOrThrow = proto.getComponents().getTransformsOrThrow(proto.getComponents().getTransformsOrThrow(TEST_PTRANSFORM_ID).getSubtransforms(0));
        ImmutableMap of2 = ImmutableMap.of(multimapSideInputKey(pCollectionView.getTagInternal().getId(), ByteString.EMPTY, copyFrom), encode("iterableValue1A", "iterableValue2A", "iterableValue3A"), multimapSideInputKey(pCollectionView.getTagInternal().getId(), ByteString.EMPTY, copyFrom2), encode("iterableValue1B", "iterableValue2B", "iterableValue3B"));
        FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(of2);
        ArrayList arrayList = new ArrayList();
        ArrayListMultimap create3 = ArrayListMultimap.create();
        String str = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
        Objects.requireNonNull(arrayList);
        create3.put(str, (v1) -> {
            r2.add(v1);
        });
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
        PipelineOptions create4 = PipelineOptionsFactory.create();
        Supplier ofInstance = Suppliers.ofInstance("57L");
        Objects.requireNonNull(ofInstance);
        java.util.function.Supplier supplier = ofInstance::get;
        Map<String, RunnerApi.PCollection> pcollectionsMap = proto.getComponents().getPcollectionsMap();
        Map<String, RunnerApi.Coder> codersMap = proto.getComponents().getCodersMap();
        Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = proto.getComponents().getWindowingStrategiesMap();
        Objects.requireNonNull(arrayList2);
        Consumer consumer = (v1) -> {
            r11.add(v1);
        };
        Objects.requireNonNull(arrayList3);
        factory.createRunnerForPTransform(create4, (BeamFnDataClient) null, fakeBeamFnStateClient, TEST_PTRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, create3, consumer, (v1) -> {
            r12.add(v1);
        }, (BundleSplitListener) null);
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
        arrayList.clear();
        Assert.assertThat(create3.keySet(), Matchers.containsInAnyOrder(registerPCollection, registerPCollection2));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Iterables.getOnlyElement(create3.get((ArrayListMultimap) registerPCollection));
        fnDataReceiver.accept(valueInWindow("X", assignWindow));
        fnDataReceiver.accept(valueInWindow("Y", assignWindow2));
        Assert.assertThat(arrayList, Matchers.hasSize(2));
        Assert.assertThat((Iterable) ((WindowedValue) arrayList.get(0)).getValue(), Matchers.contains("iterableValue1A", "iterableValue2A", "iterableValue3A"));
        Assert.assertThat((Iterable) ((WindowedValue) arrayList.get(1)).getValue(), Matchers.contains("iterableValue1B", "iterableValue2B", "iterableValue3B"));
        Assert.assertEquals(of2, fakeBeamFnStateClient.getData());
    }

    @Test
    public void testTimers() throws Exception {
        this.dateTimeProvider.setDateTimeFixed(10000L);
        Pipeline create = Pipeline.create();
        PCollection<?> pCollection = (PCollection) create.apply(Create.of(KV.of("unused", "unused"), new KV[0]));
        PCollection<?> pCollection2 = (PCollection) pCollection.apply(TEST_PTRANSFORM_ID, ParDo.of(new TestTimerfulDoFn()));
        SdkComponents create2 = SdkComponents.create();
        create2.registerEnvironment(RunnerApi.Environment.getDefaultInstance());
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(create, create2);
        String registerPCollection = create2.registerPCollection(pCollection);
        String registerPCollection2 = create2.registerPCollection(pCollection2);
        RunnerApi.PTransform build = proto.getComponents().getTransformsOrThrow(proto.getComponents().getTransformsOrThrow(TEST_PTRANSFORM_ID).getSubtransforms(0)).toBuilder().putOutputs("event", "pTransformId/ParMultiDo(TestTimerful).event.output").putOutputs("processing", "pTransformId/ParMultiDo(TestTimerful).processing.output").build();
        FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(Collections.emptyMap());
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ArrayListMultimap create3 = ArrayListMultimap.create();
        Objects.requireNonNull(arrayList);
        create3.put(registerPCollection2, (v1) -> {
            r2.add(v1);
        });
        Objects.requireNonNull(arrayList2);
        create3.put("pTransformId/ParMultiDo(TestTimerful).event.output", (v1) -> {
            r2.add(v1);
        });
        Objects.requireNonNull(arrayList3);
        create3.put("pTransformId/ParMultiDo(TestTimerful).processing.output", (v1) -> {
            r2.add(v1);
        });
        ArrayList arrayList4 = new ArrayList();
        ArrayList arrayList5 = new ArrayList();
        FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
        PipelineOptions create4 = PipelineOptionsFactory.create();
        Supplier ofInstance = Suppliers.ofInstance("57L");
        Objects.requireNonNull(ofInstance);
        java.util.function.Supplier supplier = ofInstance::get;
        ImmutableMap build2 = ImmutableMap.builder().putAll(proto.getComponents().getPcollectionsMap()).put("pTransformId/ParMultiDo(TestTimerful).event.output", proto.getComponents().getPcollectionsOrThrow("pTransformId/ParMultiDo(TestTimerful).event")).put("pTransformId/ParMultiDo(TestTimerful).processing.output", proto.getComponents().getPcollectionsOrThrow("pTransformId/ParMultiDo(TestTimerful).processing")).build();
        Map<String, RunnerApi.Coder> codersMap = proto.getComponents().getCodersMap();
        Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = proto.getComponents().getWindowingStrategiesMap();
        Objects.requireNonNull(arrayList4);
        Consumer consumer = (v1) -> {
            r11.add(v1);
        };
        Objects.requireNonNull(arrayList5);
        factory.createRunnerForPTransform(create4, (BeamFnDataClient) null, fakeBeamFnStateClient, TEST_PTRANSFORM_ID, build, supplier, build2, codersMap, windowingStrategiesMap, create3, consumer, (v1) -> {
            r12.add(v1);
        }, (BundleSplitListener) null);
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList4)).run();
        arrayList.clear();
        Assert.assertThat(create3.keySet(), Matchers.containsInAnyOrder(registerPCollection, registerPCollection2, "pTransformId/ParMultiDo(TestTimerful).event", "pTransformId/ParMultiDo(TestTimerful).event.output", "pTransformId/ParMultiDo(TestTimerful).processing", "pTransformId/ParMultiDo(TestTimerful).processing.output"));
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Iterables.getOnlyElement(create3.get((ArrayListMultimap) registerPCollection));
        FnDataReceiver fnDataReceiver2 = (FnDataReceiver) Iterables.getOnlyElement(create3.get((ArrayListMultimap) "pTransformId/ParMultiDo(TestTimerful).event"));
        FnDataReceiver fnDataReceiver3 = (FnDataReceiver) Iterables.getOnlyElement(create3.get((ArrayListMultimap) "pTransformId/ParMultiDo(TestTimerful).processing"));
        fnDataReceiver.accept(WindowedValue.timestampedValueInGlobalWindow(KV.of("X", "X1"), new Instant(1000L)));
        fnDataReceiver.accept(WindowedValue.timestampedValueInGlobalWindow(KV.of("Y", "Y1"), new Instant(1100L)));
        fnDataReceiver.accept(WindowedValue.timestampedValueInGlobalWindow(KV.of("X", "X2"), new Instant(1200L)));
        fnDataReceiver.accept(WindowedValue.timestampedValueInGlobalWindow(KV.of("Y", "Y2"), new Instant(1300L)));
        fnDataReceiver2.accept(timerInGlobalWindow("A", new Instant(1400L), new Instant(2400L)));
        fnDataReceiver2.accept(timerInGlobalWindow("B", new Instant(1500L), new Instant(2500L)));
        fnDataReceiver2.accept(timerInGlobalWindow("A", new Instant(1600L), new Instant(2600L)));
        fnDataReceiver3.accept(timerInGlobalWindow("C", new Instant(1700L), new Instant(2700L)));
        fnDataReceiver3.accept(timerInGlobalWindow("D", new Instant(1800L), new Instant(2800L)));
        fnDataReceiver3.accept(timerInGlobalWindow("C", new Instant(1900L), new Instant(2900L)));
        Assert.assertThat(arrayList, Matchers.contains(WindowedValue.timestampedValueInGlobalWindow("mainX", new Instant(1000L)), WindowedValue.timestampedValueInGlobalWindow("mainY", new Instant(1100L)), WindowedValue.timestampedValueInGlobalWindow("mainX", new Instant(1200L)), WindowedValue.timestampedValueInGlobalWindow("mainY", new Instant(1300L)), WindowedValue.timestampedValueInGlobalWindow("event", new Instant(1400L)), WindowedValue.timestampedValueInGlobalWindow("event", new Instant(1500L)), WindowedValue.timestampedValueInGlobalWindow("event", new Instant(1600L)), WindowedValue.timestampedValueInGlobalWindow("processing", new Instant(1700L)), WindowedValue.timestampedValueInGlobalWindow("processing", new Instant(1800L)), WindowedValue.timestampedValueInGlobalWindow("processing", new Instant(1900L))));
        Assert.assertThat(arrayList2, Matchers.contains(timerInGlobalWindow("X", new Instant(1000L), new Instant(1001L)), timerInGlobalWindow("Y", new Instant(1100L), new Instant(1101L)), timerInGlobalWindow("X", new Instant(1200L), new Instant(1201L)), timerInGlobalWindow("Y", new Instant(1300L), new Instant(1301L)), timerInGlobalWindow("A", new Instant(1400L), new Instant(1411L)), timerInGlobalWindow("B", new Instant(1500L), new Instant(1511L)), timerInGlobalWindow("A", new Instant(1600L), new Instant(1611L)), timerInGlobalWindow("C", new Instant(1700L), new Instant(1721L)), timerInGlobalWindow("D", new Instant(1800L), new Instant(1821L)), timerInGlobalWindow("C", new Instant(1900L), new Instant(1921L))));
        Assert.assertThat(arrayList3, Matchers.contains(timerInGlobalWindow("X", new Instant(1000L), new Instant(10002L)), timerInGlobalWindow("Y", new Instant(1100L), new Instant(10002L)), timerInGlobalWindow("X", new Instant(1200L), new Instant(10002L)), timerInGlobalWindow("Y", new Instant(1300L), new Instant(10002L)), timerInGlobalWindow("A", new Instant(1400L), new Instant(10012L)), timerInGlobalWindow("B", new Instant(1500L), new Instant(10012L)), timerInGlobalWindow("A", new Instant(1600L), new Instant(10012L)), timerInGlobalWindow("C", new Instant(1700L), new Instant(10022L)), timerInGlobalWindow("D", new Instant(1800L), new Instant(10022L)), timerInGlobalWindow("C", new Instant(1900L), new Instant(10022L))));
        arrayList.clear();
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList5)).run();
        Assert.assertThat(arrayList, Matchers.empty());
        Assert.assertEquals(ImmutableMap.of(), fakeBeamFnStateClient.getData());
        arrayList.clear();
    }

    private <T> WindowedValue<T> valueInWindow(T t, BoundedWindow boundedWindow) {
        return WindowedValue.of(t, boundedWindow.maxTimestamp(), boundedWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING);
    }

    private <T> WindowedValue<KV<T, org.apache.beam.runners.core.construction.Timer>> timerInGlobalWindow(T t, Instant instant, Instant instant2) {
        return WindowedValue.timestampedValueInGlobalWindow(KV.of(t, org.apache.beam.runners.core.construction.Timer.of(instant2)), instant);
    }

    private BeamFnApi.StateKey multimapSideInputKey(String str, ByteString byteString) throws IOException {
        return multimapSideInputKey(str, byteString, ByteString.copyFrom(CoderUtils.encodeToByteArray(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE)));
    }

    private BeamFnApi.StateKey multimapSideInputKey(String str, ByteString byteString, ByteString byteString2) {
        return BeamFnApi.StateKey.newBuilder().setMultimapSideInput(BeamFnApi.StateKey.MultimapSideInput.newBuilder().setPtransformId(TEST_PTRANSFORM_ID).setSideInputId(str).setKey(byteString).setWindow(byteString2)).build();
    }

    private ByteString encode(String... strArr) throws IOException {
        ByteString.Output newOutput = ByteString.newOutput();
        for (String str : strArr) {
            StringUtf8Coder.of().encode(str, (OutputStream) newOutput);
        }
        return newOutput.toByteString();
    }

    @Test
    public void testRegistration() {
        Iterator it = ServiceLoader.load(PTransformRunnerFactory.Registrar.class).iterator();
        while (it.hasNext()) {
            PTransformRunnerFactory.Registrar registrar = (PTransformRunnerFactory.Registrar) it.next();
            if (registrar instanceof FnApiDoFnRunner.Registrar) {
                Assert.assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(PTransformTranslation.PAR_DO_TRANSFORM_URN));
                return;
            }
        }
        Assert.fail("Expected registrar not found.");
    }
}
