package org.apache.beam.runners.flink.streaming;

import java.util.Collections;
import java.util.HashMap;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.2.10.repackaged.com.google.common.base.Function;
import org.apache.beam.runners.flink.2.10.repackaged.com.google.common.base.Predicate;
import org.apache.beam.runners.flink.2.10.repackaged.com.google.common.collect.FluentIterable;
import org.apache.beam.runners.flink.2.10.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.flink.2.10.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.PCollectionViewTesting;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/flink/streaming/DoFnOperatorTest.class */
public class DoFnOperatorTest {
    private static final long WINDOW_MSECS_1 = 100;
    private static final long WINDOW_MSECS_2 = 500;
    private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 = WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
    private PCollectionView<Iterable<String>> view1 = PCollectionViewTesting.testingView(new TupleTag<Iterable<WindowedValue<String>>>() { // from class: org.apache.beam.runners.flink.streaming.DoFnOperatorTest.1
    }, new PCollectionViewTesting.IdentityViewFn(), StringUtf8Coder.of(), this.windowingStrategy1);
    private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 = WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
    private PCollectionView<Iterable<String>> view2 = PCollectionViewTesting.testingView(new TupleTag<Iterable<WindowedValue<String>>>() { // from class: org.apache.beam.runners.flink.streaming.DoFnOperatorTest.2
    }, new PCollectionViewTesting.IdentityViewFn(), StringUtf8Coder.of(), this.windowingStrategy2);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/streaming/DoFnOperatorTest$DummyKeySelector.class */
    public static class DummyKeySelector implements KeySelector<RawUnionValue, String> {
        private DummyKeySelector() {
        }

        public String getKey(RawUnionValue rawUnionValue) throws Exception {
            return "dummy_key";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/streaming/DoFnOperatorTest$IdentityDoFn.class */
    public static class IdentityDoFn<T> extends DoFn<T, T> {
        private IdentityDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<T, T>.ProcessContext processContext) throws Exception {
            processContext.output(processContext.element());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/streaming/DoFnOperatorTest$MultiOutputDoFn.class */
    private static class MultiOutputDoFn extends DoFn<String, String> {
        private TupleTag<String> additionalOutput1;
        private TupleTag<String> additionalOutput2;

        public MultiOutputDoFn(TupleTag<String> tupleTag, TupleTag<String> tupleTag2) {
            this.additionalOutput1 = tupleTag;
            this.additionalOutput2 = tupleTag2;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, String>.ProcessContext processContext) throws Exception {
            if (((String) processContext.element()).equals("one")) {
                processContext.output(this.additionalOutput1, "extra: one");
            } else {
                if (((String) processContext.element()).equals("two")) {
                    processContext.output(this.additionalOutput2, "extra: two");
                    return;
                }
                processContext.output("got: " + ((String) processContext.element()));
                processContext.output(this.additionalOutput1, "got: " + ((String) processContext.element()));
                processContext.output(this.additionalOutput2, "got: " + ((String) processContext.element()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/streaming/DoFnOperatorTest$StringKeySelector.class */
    public static class StringKeySelector implements KeySelector<WindowedValue<String>, String> {
        private StringKeySelector() {
        }

        public String getKey(WindowedValue<String> windowedValue) throws Exception {
            return (String) windowedValue.getValue();
        }
    }

    @Test
    public void testSingleOutput() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new DoFnOperator(new IdentityDoFn(), "stepName", WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), new TupleTag("main-output"), Collections.emptyList(), new DoFnOperator.DefaultOutputManagerFactory(), WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), PipelineOptionsFactory.as(FlinkPipelineOptions.class), (Coder) null));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("Hello")));
        Assert.assertThat(stripStreamRecordFromWindowedValue(oneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("Hello")}));
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testMultiOutputOutput() throws Exception {
        WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
        TupleTag tupleTag = new TupleTag("main-output");
        TupleTag tupleTag2 = new TupleTag("output-1");
        TupleTag tupleTag3 = new TupleTag("output-2");
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new DoFnOperator(new MultiOutputDoFn(tupleTag2, tupleTag3), "stepName", valueOnlyCoder, tupleTag, ImmutableList.of(tupleTag2, tupleTag3), new DoFnOperator.MultiOutputOutputManagerFactory(ImmutableMap.builder().put(tupleTag, 1).put(tupleTag2, 2).put(tupleTag3, 3).build()), WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), PipelineOptionsFactory.as(FlinkPipelineOptions.class), (Coder) null));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("one")));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("two")));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow("hello")));
        Assert.assertThat(stripStreamRecordFromRawUnion(oneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new RawUnionValue[]{new RawUnionValue(2, WindowedValue.valueInGlobalWindow("extra: one")), new RawUnionValue(3, WindowedValue.valueInGlobalWindow("extra: two")), new RawUnionValue(1, WindowedValue.valueInGlobalWindow("got: hello")), new RawUnionValue(2, WindowedValue.valueInGlobalWindow("got: hello")), new RawUnionValue(3, WindowedValue.valueInGlobalWindow("got: hello"))}));
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testLateDroppingForStatefulFn() throws Exception {
        WindowingStrategy of = WindowingStrategy.of(FixedWindows.of(new Duration(10L)));
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new DoFnOperator(new DoFn<Integer, String>() { // from class: org.apache.beam.runners.flink.streaming.DoFnOperatorTest.3

            @DoFn.StateId("state")
            private final StateSpec<ValueState<String>> stateSpec = StateSpecs.value(StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, String>.ProcessContext processContext) {
                processContext.output(((Integer) processContext.element()).toString());
            }
        }, "stepName", WindowedValue.getFullCoder(VarIntCoder.of(), of.getWindowFn().windowCoder()), new TupleTag("main-output"), Collections.emptyList(), new DoFnOperator.DefaultOutputManagerFactory(), of, new HashMap(), Collections.emptyList(), PipelineOptionsFactory.as(FlinkPipelineOptions.class), VarIntCoder.of()), new KeySelector<WindowedValue<Integer>, Integer>() { // from class: org.apache.beam.runners.flink.streaming.DoFnOperatorTest.4
            public Integer getKey(WindowedValue<Integer> windowedValue) throws Exception {
                return (Integer) windowedValue.getValue();
            }
        }, new CoderTypeInformation(VarIntCoder.of()));
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(0L);
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), Duration.millis(10L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.of(13, new Instant(0L), intervalWindow, PaneInfo.NO_FIRING)));
        Assert.assertThat(stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.of("13", new Instant(0L), intervalWindow, PaneInfo.NO_FIRING)}));
        keyedOneInputStreamOperatorTestHarness.getOutput().clear();
        keyedOneInputStreamOperatorTestHarness.processWatermark(9L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.of(17, new Instant(0L), intervalWindow, PaneInfo.NO_FIRING)));
        Assert.assertThat(stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.of("17", new Instant(0L), intervalWindow, PaneInfo.NO_FIRING)}));
        keyedOneInputStreamOperatorTestHarness.getOutput().clear();
        keyedOneInputStreamOperatorTestHarness.processWatermark(10L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.of(17, new Instant(0L), intervalWindow, PaneInfo.NO_FIRING)));
        Assert.assertThat(stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness.getOutput()), Matchers.emptyIterable());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testStateGCForStatefulFn() throws Exception {
        WindowingStrategy withAllowedLateness = WindowingStrategy.of(FixedWindows.of(new Duration(10L))).withAllowedLateness(Duration.ZERO);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new DoFnOperator(new DoFn<KV<String, Integer>, KV<String, Integer>>() { // from class: org.apache.beam.runners.flink.streaming.DoFnOperatorTest.5

            @DoFn.TimerId("boo")
            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

            @DoFn.StateId("dazzle")
            private final StateSpec<ValueState<String>> stateSpec = StateSpecs.value(StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, Integer>, KV<String, Integer>>.ProcessContext processContext, @DoFn.TimerId("boo") Timer timer, @DoFn.StateId("dazzle") ValueState<String> valueState, BoundedWindow boundedWindow) {
                timer.set(boundedWindow.maxTimestamp());
                valueState.write(((KV) processContext.element()).getKey());
                processContext.output(KV.of(((KV) processContext.element()).getKey(), Integer.valueOf(((Integer) ((KV) processContext.element()).getValue()).intValue() + 5000)));
            }

            @DoFn.OnTimer("boo")
            public void onTimer(DoFn<KV<String, Integer>, KV<String, Integer>>.OnTimerContext onTimerContext, @DoFn.StateId("dazzle") ValueState<String> valueState) {
                onTimerContext.output(KV.of(valueState.read(), 4093));
            }
        }, "stepName", WindowedValue.getFullCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), withAllowedLateness.getWindowFn().windowCoder()), new TupleTag("main-output"), Collections.emptyList(), new DoFnOperator.DefaultOutputManagerFactory(), withAllowedLateness, new HashMap(), Collections.emptyList(), PipelineOptionsFactory.as(FlinkPipelineOptions.class), StringUtf8Coder.of()), new KeySelector<WindowedValue<KV<String, Integer>>, String>() { // from class: org.apache.beam.runners.flink.streaming.DoFnOperatorTest.6
            public String getKey(WindowedValue<KV<String, Integer>> windowedValue) throws Exception {
                return (String) ((KV) windowedValue.getValue()).getKey();
            }
        }, new CoderTypeInformation(StringUtf8Coder.of()));
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(0L);
        Assert.assertEquals(0L, keyedOneInputStreamOperatorTestHarness.numKeyedStateEntries());
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), Duration.millis(10L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.of(KV.of("key1", 5), new Instant(1L), intervalWindow, PaneInfo.NO_FIRING)));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(WindowedValue.of(KV.of("key2", 7), new Instant(3L), intervalWindow, PaneInfo.NO_FIRING)));
        Assert.assertThat(stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.of(KV.of("key1", 5005), new Instant(1L), intervalWindow, PaneInfo.NO_FIRING), WindowedValue.of(KV.of("key2", 5007), new Instant(3L), intervalWindow, PaneInfo.NO_FIRING)}));
        Assert.assertEquals(2L, keyedOneInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedOneInputStreamOperatorTestHarness.getOutput().clear();
        keyedOneInputStreamOperatorTestHarness.processWatermark(intervalWindow.maxTimestamp().plus(withAllowedLateness.getAllowedLateness()).plus(1L).getMillis());
        Assert.assertThat(stripStreamRecordFromWindowedValue(keyedOneInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{WindowedValue.of(KV.of("key1", 4093), new Instant(9L), intervalWindow, PaneInfo.NO_FIRING), WindowedValue.of(KV.of("key2", 4093), new Instant(9L), intervalWindow, PaneInfo.NO_FIRING)}));
        Assert.assertEquals(0L, keyedOneInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    public void testSideInputs(boolean z) throws Exception {
        WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
        TupleTag tupleTag = new TupleTag("main-output");
        ImmutableMap build = ImmutableMap.builder().put(1, this.view1).put(2, this.view2).build();
        StringUtf8Coder stringUtf8Coder = null;
        if (z) {
            stringUtf8Coder = StringUtf8Coder.of();
        }
        DoFnOperator doFnOperator = new DoFnOperator(new IdentityDoFn(), "stepName", valueOnlyCoder, tupleTag, Collections.emptyList(), new DoFnOperator.DefaultOutputManagerFactory(), WindowingStrategy.globalDefault(), build, ImmutableList.of(this.view1, this.view2), PipelineOptionsFactory.as(FlinkPipelineOptions.class), stringUtf8Coder);
        KeyedTwoInputStreamOperatorTestHarness twoInputStreamOperatorTestHarness = new TwoInputStreamOperatorTestHarness(doFnOperator);
        if (z) {
            twoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(doFnOperator, new StringKeySelector(), new DummyKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
        }
        twoInputStreamOperatorTestHarness.open();
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), new Instant(WINDOW_MSECS_1));
        IntervalWindow intervalWindow2 = new IntervalWindow(new Instant(0L), new Instant(WINDOW_MSECS_2));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(new RawUnionValue(1, valuesInWindow(ImmutableList.of("hello", "ciao"), new Instant(0L), intervalWindow))));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(new RawUnionValue(2, valuesInWindow(ImmutableList.of("foo", "bar"), new Instant(0L), intervalWindow2))));
        WindowedValue valueInWindow = valueInWindow("Hello", new Instant(0L), intervalWindow);
        WindowedValue valueInWindow2 = valueInWindow("World", new Instant(1000L), intervalWindow);
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord(valueInWindow));
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord(valueInWindow2));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(new RawUnionValue(1, valuesInWindow(ImmutableList.of("hello", "ciao"), new Instant(1000L), intervalWindow))));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(new RawUnionValue(2, valuesInWindow(ImmutableList.of("foo", "bar"), new Instant(1000L), intervalWindow2))));
        Assert.assertThat(stripStreamRecordFromWindowedValue(twoInputStreamOperatorTestHarness.getOutput()), IsIterableContainingInOrder.contains(new WindowedValue[]{valueInWindow, valueInWindow2}));
        twoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testNormalParDoSideInputs() throws Exception {
        testSideInputs(false);
    }

    @Test
    public void testKeyedSideInputs() throws Exception {
        testSideInputs(true);
    }

    private <T> Iterable<WindowedValue<T>> stripStreamRecordFromWindowedValue(Iterable<Object> iterable) {
        return FluentIterable.from(iterable).filter(new Predicate<Object>() { // from class: org.apache.beam.runners.flink.streaming.DoFnOperatorTest.8
            public boolean apply(@Nullable Object obj) {
                return (obj instanceof StreamRecord) && (((StreamRecord) obj).getValue() instanceof WindowedValue);
            }
        }).transform(new Function<Object, WindowedValue<T>>() { // from class: org.apache.beam.runners.flink.streaming.DoFnOperatorTest.7
            @Nullable
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public WindowedValue<T> m6apply(@Nullable Object obj) {
                if ((obj instanceof StreamRecord) && (((StreamRecord) obj).getValue() instanceof WindowedValue)) {
                    return (WindowedValue) ((StreamRecord) obj).getValue();
                }
                throw new RuntimeException("unreachable");
            }
        });
    }

    private Iterable<RawUnionValue> stripStreamRecordFromRawUnion(Iterable<Object> iterable) {
        return FluentIterable.from(iterable).filter(new Predicate<Object>() { // from class: org.apache.beam.runners.flink.streaming.DoFnOperatorTest.10
            public boolean apply(@Nullable Object obj) {
                return (obj instanceof StreamRecord) && (((StreamRecord) obj).getValue() instanceof RawUnionValue);
            }
        }).transform(new Function<Object, RawUnionValue>() { // from class: org.apache.beam.runners.flink.streaming.DoFnOperatorTest.9
            @Nullable
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public RawUnionValue m7apply(@Nullable Object obj) {
                if ((obj instanceof StreamRecord) && (((StreamRecord) obj).getValue() instanceof RawUnionValue)) {
                    return (RawUnionValue) ((StreamRecord) obj).getValue();
                }
                throw new RuntimeException("unreachable");
            }
        });
    }

    private WindowedValue<Iterable<?>> valuesInWindow(Iterable<?> iterable, Instant instant, BoundedWindow boundedWindow) {
        return WindowedValue.of(iterable, instant, boundedWindow, PaneInfo.NO_FIRING);
    }

    private <T> WindowedValue<T> valueInWindow(T t, Instant instant, BoundedWindow boundedWindow) {
        return WindowedValue.of(t, instant, boundedWindow, PaneInfo.NO_FIRING);
    }
}
