/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming;

import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.databind.util.LRUMap;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils;
import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.streaming.NonKeyedPushedBackElementsHandler;
import org.apache.beam.runners.flink.translation.wrappers.streaming.PushedBackElementsHandler;
import org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.testing.PCollectionViewTesting;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.OutputTag;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.reflect.Whitebox;

@RunWith(value=JUnit4.class)
public class DoFnOperatorTest {
    private static final @UnknownKeyFor @NonNull @Initialized long WINDOW_MSECS_1 = 100L;
    private static final @UnknownKeyFor @NonNull @Initialized long WINDOW_MSECS_2 = 500L;
    private @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized String>> view1;
    private @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized String>> view2;
    private @UnknownKeyFor @NonNull @Initialized int numStartBundleCalled = 0;

    @Before
    public void setUp() {
        PCollection pc = (PCollection)Pipeline.create().apply((PTransform)Create.of((Object)"1", (Object[])new String[0]));
        this.view1 = (PCollectionView)((PCollection)pc.apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)new Duration(100L))))).apply((PTransform)View.asIterable());
        this.view2 = (PCollectionView)((PCollection)pc.apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)new Duration(500L))))).apply((PTransform)View.asIterable());
    }

    @Test
    public void testSingleOutput() throws @UnknownKeyFor @NonNull @Initialized Exception {
        WindowedValue.ValueOnlyWindowedValueCoder coder = WindowedValue.getValueOnlyCoder((Coder)StringUtf8Coder.of());
        TupleTag outputTag = new TupleTag("main-output");
        DoFnOperator doFnOperator = new DoFnOperator(new IdentityDoFn(), "stepName", (Coder)coder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)coder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())), WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), (PipelineOptions)FlinkPipelineOptions.defaults(), null, null, DoFnSchemaInformation.create(), Collections.emptyMap());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"Hello")));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"Hello")}));
        testHarness.close();
    }

    @Test
    public void testMultiOutputOutput() throws @UnknownKeyFor @NonNull @Initialized Exception {
        WindowedValue.ValueOnlyWindowedValueCoder coder = WindowedValue.getValueOnlyCoder((Coder)StringUtf8Coder.of());
        TupleTag mainOutput = new TupleTag("main-output");
        TupleTag additionalOutput1 = new TupleTag("output-1");
        TupleTag additionalOutput2 = new TupleTag("output-2");
        ImmutableMap tagsToOutputTags = ImmutableMap.builder().put((Object)additionalOutput1, (Object)new OutputTag<WindowedValue<?>>(additionalOutput1.getId()){}).put((Object)additionalOutput2, (Object)new OutputTag<WindowedValue<?>>(additionalOutput2.getId()){}).build();
        ImmutableMap tagsToCoders = ImmutableMap.builder().put((Object)mainOutput, (Object)coder).put((Object)additionalOutput1, (Object)coder).put((Object)additionalOutput2, (Object)coder).build();
        ImmutableMap tagsToIds = ImmutableMap.builder().put((Object)mainOutput, (Object)0).put((Object)additionalOutput1, (Object)1).put((Object)additionalOutput2, (Object)2).build();
        DoFnOperator doFnOperator = new DoFnOperator((DoFn)new MultiOutputDoFn((TupleTag<String>)additionalOutput1, (TupleTag<String>)additionalOutput2), "stepName", (Coder)coder, Collections.emptyMap(), mainOutput, (List)ImmutableList.of((Object)additionalOutput1, (Object)additionalOutput2), (DoFnOperator.OutputManagerFactory)new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, (Map)tagsToOutputTags, (Map)tagsToCoders, (Map)tagsToIds, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())), WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), (PipelineOptions)FlinkPipelineOptions.defaults(), null, null, DoFnSchemaInformation.create(), Collections.emptyMap());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"one")));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"two")));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"hello")));
        MatcherAssert.assertThat(this.stripStreamRecord(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"got: hello")}));
        MatcherAssert.assertThat(this.stripStreamRecord(testHarness.getSideOutput((OutputTag)tagsToOutputTags.get((Object)additionalOutput1))), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"extra: one"), WindowedValue.valueInGlobalWindow((Object)"got: hello")}));
        MatcherAssert.assertThat(this.stripStreamRecord(testHarness.getSideOutput((OutputTag)tagsToOutputTags.get((Object)additionalOutput2))), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"extra: two"), WindowedValue.valueInGlobalWindow((Object)"got: hello")}));
        testHarness.close();
    }

    @Test
    public void testWatermarkContract() throws @UnknownKeyFor @NonNull @Initialized Exception {
        final Instant timerTimestamp = new Instant(1000L);
        final Instant timerOutputTimestamp = timerTimestamp.minus(1L);
        String eventTimeMessage = "Event timer fired: ";
        String processingTimeMessage = "Processing timer fired";
        WindowingStrategy windowingStrategy = WindowingStrategy.of((WindowFn)FixedWindows.of((Duration)new Duration(10000L)));
        String eventTimerId = "eventTimer";
        String eventTimerId2 = "eventTimer2";
        String processingTimerId = "processingTimer";
        DoFn<Integer, String> fn = new DoFn<Integer, String>(){
            @DoFn.TimerId(value="eventTimer")
            private final @UnknownKeyFor @NonNull @Initialized TimerSpec eventTimer = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);
            @DoFn.TimerId(value="eventTimer2")
            private final @UnknownKeyFor @NonNull @Initialized TimerSpec eventTimer2 = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);
            @DoFn.TimerId(value="processingTimer")
            private final @UnknownKeyFor @NonNull @Initialized TimerSpec processingTimer = TimerSpecs.timer((TimeDomain)TimeDomain.PROCESSING_TIME);

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context, @DoFn.TimerId(value="eventTimer") @UnknownKeyFor @NonNull @Initialized Timer eventTimer, @DoFn.TimerId(value="eventTimer2") @UnknownKeyFor @NonNull @Initialized Timer eventTimerWithOutputTimestamp, @DoFn.TimerId(value="processingTimer") @UnknownKeyFor @NonNull @Initialized Timer processingTimer) {
                eventTimer.set(timerTimestamp);
                eventTimerWithOutputTimestamp.withOutputTimestamp(timerOutputTimestamp).set(timerTimestamp);
                processingTimer.offset(Duration.millis((long)timerTimestamp.getMillis())).setRelative();
            }

            @DoFn.OnTimer(value="eventTimer")
            public void onEventTime(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized OnTimerContext context) {
                Assert.assertEquals((String)"Timer timestamp must match set timestamp.", (Object)timerTimestamp, (Object)context.timestamp());
                context.outputWithTimestamp((Object)"Event timer fired: eventTimer", context.timestamp());
            }

            @DoFn.OnTimer(value="eventTimer2")
            public void onEventTime2(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized OnTimerContext context) {
                Assert.assertEquals((String)"Timer timestamp must match set timestamp.", (Object)timerTimestamp, (Object)context.fireTimestamp());
                context.output((Object)"Event timer fired: eventTimer2");
            }

            @DoFn.OnTimer(value="processingTimer")
            public void onProcessingTime(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized OnTimerContext context) {
                Assert.assertEquals((String)"Timer timestamp must match current input watermark", (Object)timerTimestamp.plus(1L), (Object)context.timestamp());
                context.outputWithTimestamp((Object)"Processing timer fired", context.timestamp());
            }
        };
        VarIntCoder keyCoder = VarIntCoder.of();
        WindowedValue.FullWindowedValueCoder inputCoder = WindowedValue.getFullCoder((Coder)keyCoder, (Coder)windowingStrategy.getWindowFn().windowCoder());
        WindowedValue.FullWindowedValueCoder outputCoder = WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)windowingStrategy.getWindowFn().windowCoder());
        KeySelector & Serializable keySelector = (KeySelector & Serializable)e -> FlinkKeyUtils.encodeKey((Object)((Integer)e.getValue()), (Coder)keyCoder);
        TupleTag outputTag = new TupleTag("main-output");
        DoFnOperator doFnOperator = new DoFnOperator((DoFn)fn, "stepName", (Coder)inputCoder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)outputCoder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())), windowingStrategy, new HashMap(), Collections.emptyList(), (PipelineOptions)FlinkPipelineOptions.defaults(), (Coder)keyCoder, (KeySelector)keySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator, (KeySelector)keySelector, (TypeInformation)new CoderTypeInformation((Coder)FlinkKeyUtils.ByteBufferCoder.of(), (PipelineOptions)FlinkPipelineOptions.defaults()));
        testHarness.setup((TypeSerializer)new CoderTypeSerializer((Coder)outputCoder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())));
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.setProcessingTime(0L);
        IntervalWindow window1 = new IntervalWindow(new Instant(0L), (ReadableDuration)Duration.millis((long)10000L));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.of((Object)13, (Instant)new Instant(0L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)Matchers.emptyIterable());
        testHarness.processWatermark(timerTimestamp.getMillis());
        testHarness.setProcessingTime(timerTimestamp.getMillis());
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)Matchers.emptyIterable());
        MatcherAssert.assertThat((Object)doFnOperator.keyedStateInternals.minWatermarkHoldMs(), (Matcher)Matchers.is((Object)timerOutputTimestamp.getMillis()));
        testHarness.processWatermark(timerTimestamp.getMillis() + 1L);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)Matchers.containsInAnyOrder((Object[])new WindowedValue[]{WindowedValue.of((Object)"Event timer fired: eventTimer", (Instant)timerTimestamp, (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING), WindowedValue.of((Object)"Event timer fired: eventTimer2", (Instant)timerTimestamp.minus(1L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING)}));
        testHarness.getOutput().clear();
        testHarness.setProcessingTime(timerTimestamp.getMillis() + 1L);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.of((Object)"Processing timer fired", (Instant)timerTimestamp.plus(1L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING)}));
        testHarness.close();
    }

    @Test
    public void testWatermarkUpdateAfterWatermarkHoldRelease() throws @UnknownKeyFor @NonNull @Initialized Exception {
        WindowedValue.ValueOnlyWindowedValueCoder coder = WindowedValue.getValueOnlyCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)StringUtf8Coder.of()));
        TupleTag outputTag = new TupleTag("main-output");
        final ArrayList emittedWatermarkHolds = new ArrayList();
        KeySelector & Serializable keySelector = (KeySelector & Serializable)e -> FlinkKeyUtils.encodeKey((Object)((String)((KV)e.getValue()).getKey()), (Coder)StringUtf8Coder.of());
        DoFnOperator<KV<String, String>, KV<String, String>> doFnOperator = new DoFnOperator<KV<String, String>, KV<String, String>>(new IdentityDoFn(), "stepName", (Coder)coder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)coder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())), WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), (PipelineOptions)FlinkPipelineOptions.defaults(), (Coder)StringUtf8Coder.of(), (KeySelector)keySelector, DoFnSchemaInformation.create(), Collections.emptyMap()){

            protected @UnknownKeyFor @NonNull @Initialized DoFnRunner<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String>, @UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String>> createWrappingDoFnRunner(final @UnknownKeyFor @NonNull @Initialized DoFnRunner<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String>, @UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String>> wrappedRunner, @UnknownKeyFor @NonNull @Initialized StepContext stepContext) {
                final StateNamespace namespace = StateNamespaces.window((Coder)GlobalWindow.Coder.INSTANCE, (BoundedWindow)GlobalWindow.INSTANCE);
                StateTag holdTag = StateTags.watermarkStateInternal((String)"hold", (TimestampCombiner)TimestampCombiner.LATEST);
                final WatermarkHoldState holdState = (WatermarkHoldState)stepContext.stateInternals().state(namespace, holdTag);
                final TimerInternals timerInternals = stepContext.timerInternals();
                return new DoFnRunner<KV<String, String>, KV<String, String>>(){

                    public void startBundle() {
                        wrappedRunner.startBundle();
                    }

                    public void processElement(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String>> elem) {
                        wrappedRunner.processElement(elem);
                        holdState.add((Object)elem.getTimestamp());
                        timerInternals.setTimer(namespace, "timer", "family", elem.getTimestamp().plus(1L), elem.getTimestamp().plus(1L), TimeDomain.EVENT_TIME);
                        timerInternals.setTimer(namespace, "cleanup", "", GlobalWindow.INSTANCE.maxTimestamp(), GlobalWindow.INSTANCE.maxTimestamp(), TimeDomain.EVENT_TIME);
                    }

                    public <KeyT> void onTimer(@UnknownKeyFor @NonNull @Initialized String timerId, @UnknownKeyFor @NonNull @Initialized String timerFamilyId, KeyT key, @UnknownKeyFor @NonNull @Initialized BoundedWindow window, @UnknownKeyFor @NonNull @Initialized Instant timestamp, @UnknownKeyFor @NonNull @Initialized Instant outputTimestamp, @UnknownKeyFor @NonNull @Initialized TimeDomain timeDomain) {
                        if ("cleanup".equals(timerId)) {
                            holdState.clear();
                        } else {
                            holdState.add((Object)outputTimestamp);
                        }
                    }

                    public void finishBundle() {
                        wrappedRunner.finishBundle();
                    }

                    public <KeyT> void onWindowExpiration(@UnknownKeyFor @NonNull @Initialized BoundedWindow window, @UnknownKeyFor @NonNull @Initialized Instant timestamp, KeyT key) {
                        wrappedRunner.onWindowExpiration(window, timestamp, key);
                    }

                    public @UnknownKeyFor @NonNull @Initialized DoFn<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String>, @UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String>> getFn() {
                        return doFn;
                    }
                };
            }

            void emitWatermarkIfHoldChanged(@UnknownKeyFor @NonNull @Initialized long currentWatermarkHold) {
                emittedWatermarkHolds.add(this.keyedStateInternals.minWatermarkHoldMs());
            }
        };
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator, (KeySelector)keySelector, (TypeInformation)new CoderTypeInformation((Coder)FlinkKeyUtils.ByteBufferCoder.of(), (PipelineOptions)FlinkPipelineOptions.defaults()));
        testHarness.setup();
        Instant now = Instant.now();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"Key", (Object)"Hello"), (Instant)now)));
        MatcherAssert.assertThat(emittedWatermarkHolds, (Matcher)Matchers.is((Matcher)Matchers.equalTo(Collections.singletonList(now.getMillis()))));
        testHarness.processWatermark(now.getMillis() + 2L);
        MatcherAssert.assertThat(emittedWatermarkHolds, (Matcher)Matchers.is((Matcher)Matchers.equalTo(Arrays.asList(now.getMillis(), now.getMillis() + 1L))));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"Key", (Object)"Hello"), (Instant)now.plus(2L))));
        MatcherAssert.assertThat(emittedWatermarkHolds, (Matcher)Matchers.is((Matcher)Matchers.equalTo(Arrays.asList(now.getMillis(), now.getMillis() + 1L, now.getMillis() + 2L))));
        testHarness.processWatermark(GlobalWindow.INSTANCE.maxTimestamp().plus(1L).getMillis());
        testHarness.processWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
        testHarness.close();
    }

    @Test
    public void testLateDroppingForStatefulFn() throws @UnknownKeyFor @NonNull @Initialized Exception {
        WindowingStrategy windowingStrategy = WindowingStrategy.of((WindowFn)FixedWindows.of((Duration)new Duration(10L)));
        DoFn<Integer, String> fn = new DoFn<Integer, String>(){
            @DoFn.StateId(value="state")
            private final @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized String>> stateSpec = StateSpecs.value((Coder)StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context) {
                context.output((Object)((Integer)context.element()).toString());
            }
        };
        VarIntCoder keyCoder = VarIntCoder.of();
        WindowedValue.FullWindowedValueCoder inputCoder = WindowedValue.getFullCoder((Coder)keyCoder, (Coder)windowingStrategy.getWindowFn().windowCoder());
        WindowedValue.FullWindowedValueCoder outputCoder = WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)windowingStrategy.getWindowFn().windowCoder());
        KeySelector & Serializable keySelector = (KeySelector & Serializable)e -> FlinkKeyUtils.encodeKey((Object)((Integer)e.getValue()), (Coder)keyCoder);
        TupleTag outputTag = new TupleTag("main-output");
        DoFnOperator doFnOperator = new DoFnOperator((DoFn)fn, "stepName", (Coder)inputCoder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)outputCoder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())), windowingStrategy, new HashMap(), Collections.emptyList(), (PipelineOptions)FlinkPipelineOptions.defaults(), (Coder)keyCoder, (KeySelector)keySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator, (KeySelector)keySelector, (TypeInformation)new CoderTypeInformation((Coder)FlinkKeyUtils.ByteBufferCoder.of(), (PipelineOptions)FlinkPipelineOptions.defaults()));
        testHarness.open();
        testHarness.processWatermark(0L);
        IntervalWindow window1 = new IntervalWindow(new Instant(0L), (ReadableDuration)Duration.millis((long)10L));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.of((Object)13, (Instant)new Instant(0L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.of((Object)"13", (Instant)new Instant(0L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING)}));
        testHarness.getOutput().clear();
        testHarness.processWatermark(9L);
        testHarness.processElement(new StreamRecord((Object)WindowedValue.of((Object)17, (Instant)new Instant(0L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.of((Object)"17", (Instant)new Instant(0L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING)}));
        testHarness.getOutput().clear();
        testHarness.processWatermark(10L);
        testHarness.processElement(new StreamRecord((Object)WindowedValue.of((Object)17, (Instant)new Instant(0L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)Matchers.emptyIterable());
        testHarness.close();
    }

    @Test
    public void testStateGCForStatefulFn() throws @UnknownKeyFor @NonNull @Initialized Exception {
        WindowingStrategy windowingStrategy = WindowingStrategy.of((WindowFn)FixedWindows.of((Duration)new Duration(10L))).withAllowedLateness(Duration.ZERO);
        int offset = 5000;
        int timerOutput = 4093;
        KeyedOneInputStreamOperatorTestHarness<ByteBuffer, WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = DoFnOperatorTest.getHarness(windowingStrategy, 5000, (Function<BoundedWindow, Instant>)((Function)window -> new Instant((Object)Objects.requireNonNull(window).maxTimestamp())), 4093);
        testHarness.open();
        testHarness.processWatermark(0L);
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        IntervalWindow window1 = new IntervalWindow(new Instant(0L), (ReadableDuration)Duration.millis((long)10L));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.of((Object)KV.of((Object)"key1", (Object)5), (Instant)new Instant(1L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING)));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.of((Object)KV.of((Object)"key2", (Object)7), (Instant)new Instant(3L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.of((Object)KV.of((Object)"key1", (Object)5005), (Instant)new Instant(1L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING), WindowedValue.of((Object)KV.of((Object)"key2", (Object)5007), (Instant)new Instant(3L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING)}));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.getOutput().clear();
        testHarness.processWatermark(window1.maxTimestamp().plus((ReadableDuration)windowingStrategy.getAllowedLateness()).plus(1L).getMillis() + 1L);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.of((Object)KV.of((Object)"key1", (Object)4093), (Instant)new Instant(9L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING), WindowedValue.of((Object)KV.of((Object)"key2", (Object)4093), (Instant)new Instant(9L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING)}));
        testHarness.close();
    }

    @Test
    public void testGCForGlobalWindow() throws @UnknownKeyFor @NonNull @Initialized Exception {
        WindowingStrategy windowingStrategy = WindowingStrategy.globalDefault();
        KeyedOneInputStreamOperatorTestHarness<ByteBuffer, WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = DoFnOperatorTest.getHarness(windowingStrategy, 5000, (Function<BoundedWindow, Instant>)((Function)window -> new Instant(50L)), 4092);
        testHarness.open();
        testHarness.processWatermark(0L);
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement(new StreamRecord((Object)WindowedValue.of((Object)KV.of((Object)"key1", (Object)5), (Instant)new Instant(23L), (BoundedWindow)GlobalWindow.INSTANCE, (PaneInfo)PaneInfo.NO_FIRING)));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.of((Object)KV.of((Object)"key2", (Object)6), (Instant)new Instant(42L), (BoundedWindow)GlobalWindow.INSTANCE, (PaneInfo)PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)Matchers.is((Object)4));
        testHarness.processWatermark(51L);
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)Matchers.is((Object)2));
        testHarness.processWatermark(GlobalWindow.INSTANCE.maxTimestamp().plus(1L).getMillis());
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)Matchers.is((Object)2));
        testHarness.processWatermark(GlobalWindow.INSTANCE.maxTimestamp().plus(2L).getMillis());
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)Matchers.is((Object)0));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.of((Object)KV.of((Object)"key2", (Object)6), (Instant)new Instant(42L), (BoundedWindow)GlobalWindow.INSTANCE, (PaneInfo)PaneInfo.NO_FIRING)));
        testHarness.close();
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)Matchers.is((Object)0));
    }

    private static @UnknownKeyFor @NonNull @Initialized KeyedOneInputStreamOperatorTestHarness<@UnknownKeyFor @NonNull @Initialized ByteBuffer, @UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer>>, @UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer>>> getHarness(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized WindowingStrategy<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> windowingStrategy, final @UnknownKeyFor @NonNull @Initialized int elementOffset, final @UnknownKeyFor @NonNull @Initialized Function<@UnknownKeyFor @NonNull @Initialized BoundedWindow, @UnknownKeyFor @NonNull @Initialized Instant> timerTimestamp, final @UnknownKeyFor @NonNull @Initialized int timerOutput) throws @UnknownKeyFor @NonNull @Initialized Exception {
        String timerId = "boo";
        String stateId = "dazzle";
        DoFn<KV<String, Integer>, KV<String, Integer>> fn = new DoFn<KV<String, Integer>, KV<String, Integer>>(){
            @DoFn.TimerId(value="boo")
            private final @UnknownKeyFor @NonNull @Initialized TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);
            @DoFn.StateId(value="dazzle")
            private final @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized String>> stateSpec = StateSpecs.value((Coder)StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context, @DoFn.TimerId(value="boo") @UnknownKeyFor @NonNull @Initialized Timer timer, @DoFn.StateId(value="dazzle") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized String> state, @UnknownKeyFor @NonNull @Initialized BoundedWindow window) {
                timer.set(Objects.requireNonNull((Instant)timerTimestamp.apply((Object)window)));
                state.write((Object)((String)((KV)context.element()).getKey()));
                context.output((Object)KV.of((Object)((String)((KV)context.element()).getKey()), (Object)((Integer)((KV)context.element()).getValue() + elementOffset)));
            }

            @DoFn.OnTimer(value="boo")
            public void onTimer(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized OnTimerContext context, @DoFn.StateId(value="dazzle") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized String> state) {
                context.output((Object)KV.of((Object)((String)state.read()), (Object)timerOutput));
            }
        };
        WindowedValue.FullWindowedValueCoder coder = WindowedValue.getFullCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of()), (Coder)windowingStrategy.getWindowFn().windowCoder());
        TupleTag outputTag = new TupleTag("main-output");
        KeySelector & Serializable keySelector = (KeySelector & Serializable)e -> FlinkKeyUtils.encodeKey((Object)((String)((KV)e.getValue()).getKey()), (Coder)StringUtf8Coder.of());
        DoFnOperator doFnOperator = new DoFnOperator((DoFn)fn, "stepName", (Coder)coder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)coder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())), windowingStrategy, new HashMap(), Collections.emptyList(), (PipelineOptions)FlinkPipelineOptions.defaults(), (Coder)StringUtf8Coder.of(), (KeySelector)keySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
        return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator, (KeySelector)keySelector, (TypeInformation)new CoderTypeInformation((Coder)FlinkKeyUtils.ByteBufferCoder.of(), (PipelineOptions)FlinkPipelineOptions.defaults()));
    }

    @Test
    public void testNormalParDoSideInputs() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.testSideInputs(false);
    }

    @Test
    public void testKeyedParDoSideInputs() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.testSideInputs(true);
    }

    void testSideInputs(@UnknownKeyFor @NonNull @Initialized boolean keyed) throws @UnknownKeyFor @NonNull @Initialized Exception {
        WindowedValue.ValueOnlyWindowedValueCoder coder = WindowedValue.getValueOnlyCoder((Coder)StringUtf8Coder.of());
        TupleTag outputTag = new TupleTag("main-output");
        ImmutableMap sideInputMapping = ImmutableMap.builder().put((Object)1, this.view1).put((Object)2, this.view2).build();
        StringUtf8Coder keyCoder = StringUtf8Coder.of();
        Object keySelector = null;
        if (keyed) {
            keySelector = arg_0 -> DoFnOperatorTest.lambda$testSideInputs$2e46429e$1((Coder)keyCoder, arg_0);
        }
        DoFnOperator doFnOperator = new DoFnOperator(new IdentityDoFn(), "stepName", (Coder)coder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)coder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())), WindowingStrategy.of((WindowFn)FixedWindows.of((Duration)Duration.millis((long)100L))), (Map)sideInputMapping, (Collection)ImmutableList.of(this.view1, this.view2), (PipelineOptions)FlinkPipelineOptions.defaults(), (Coder)(keyed ? keyCoder : null), (KeySelector)(keyed ? keySelector : null), DoFnSchemaInformation.create(), Collections.emptyMap());
        TwoInputStreamOperatorTestHarness testHarness = new TwoInputStreamOperatorTestHarness((TwoInputStreamOperator)doFnOperator);
        if (keyed) {
            testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)doFnOperator, (KeySelector)keySelector, null, (TypeInformation)new CoderTypeInformation((Coder)FlinkKeyUtils.ByteBufferCoder.of(), (PipelineOptions)FlinkPipelineOptions.defaults()));
        }
        testHarness.open();
        IntervalWindow firstWindow = new IntervalWindow(new Instant(0L), new Instant(100L));
        IntervalWindow secondWindow = new IntervalWindow(new Instant(0L), new Instant(500L));
        testHarness.processElement2(new StreamRecord((Object)new RawUnionValue(1, this.valuesInWindow(PCollectionViewTesting.materializeValuesFor((PipelineOptions)this.view1.getPipeline().getOptions(), (PTransform)View.asIterable(), (Object[])new Object[]{"hello", "ciao"}), new Instant(0L), (BoundedWindow)firstWindow))));
        testHarness.processElement2(new StreamRecord((Object)new RawUnionValue(2, this.valuesInWindow(PCollectionViewTesting.materializeValuesFor((PipelineOptions)this.view2.getPipeline().getOptions(), (PTransform)View.asIterable(), (Object[])new Object[]{"foo", "bar"}), new Instant(0L), (BoundedWindow)secondWindow))));
        WindowedValue<String> helloElement = this.valueInWindow("Hello", new Instant(0L), (BoundedWindow)firstWindow);
        WindowedValue<String> worldElement = this.valueInWindow("World", new Instant(1000L), (BoundedWindow)firstWindow);
        testHarness.processElement1(new StreamRecord(helloElement));
        testHarness.processElement1(new StreamRecord(worldElement));
        testHarness.processElement2(new StreamRecord((Object)new RawUnionValue(1, this.valuesInWindow(PCollectionViewTesting.materializeValuesFor((PipelineOptions)this.view1.getPipeline().getOptions(), (PTransform)View.asIterable(), (Object[])new Object[]{"hello", "ciao"}), new Instant(1000L), (BoundedWindow)firstWindow))));
        testHarness.processElement2(new StreamRecord((Object)new RawUnionValue(2, this.valuesInWindow(PCollectionViewTesting.materializeValuesFor((PipelineOptions)this.view2.getPipeline().getOptions(), (PTransform)View.asIterable(), (Object[])new Object[]{"foo", "bar"}), new Instant(1000L), (BoundedWindow)secondWindow))));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{helloElement, worldElement}));
        testHarness.close();
    }

    @Test
    public void testStateRestore() throws @UnknownKeyFor @NonNull @Initialized Exception {
        DoFn<KV<String, Long>, KV<String, Long>> filterElementsEqualToCountFn = new DoFn<KV<String, Long>, KV<String, Long>>(){
            @DoFn.StateId(value="counter")
            private final @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Long>> counterSpec = StateSpecs.value((Coder)VarLongCoder.of());

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context, @DoFn.StateId(value="counter") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Long> count) {
                long currentCount = Optional.ofNullable((Long)count.read()).orElse(0L);
                count.write((Object)(++currentCount));
                KV currentElement = (KV)context.element();
                if (currentCount == (Long)currentElement.getValue()) {
                    context.output((Object)currentElement);
                }
            }
        };
        WindowingStrategy windowingStrategy = WindowingStrategy.globalDefault();
        TupleTag outputTag = new TupleTag("main-output");
        StringUtf8Coder keyCoder = StringUtf8Coder.of();
        KvToByteBufferKeySelector keySelector = new KvToByteBufferKeySelector((Coder)keyCoder, null);
        KvCoder coder = KvCoder.of((Coder)keyCoder, (Coder)VarLongCoder.of());
        WindowedValue.FullWindowedValueCoder kvCoder = WindowedValue.getFullCoder((Coder)coder, (Coder)windowingStrategy.getWindowFn().windowCoder());
        CoderTypeInformation keyCoderInfo = new CoderTypeInformation((Coder)FlinkKeyUtils.ByteBufferCoder.of(), (PipelineOptions)FlinkPipelineOptions.defaults());
        OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Long>>, WindowedValue<KV<String, Long>>> testHarness = this.createTestHarness((WindowingStrategy<Object, ?>)windowingStrategy, (DoFn)filterElementsEqualToCountFn, (WindowedValue.FullWindowedValueCoder)kvCoder, (WindowedValue.FullWindowedValueCoder)kvCoder, (Coder<?>)keyCoder, (TupleTag)outputTag, (TypeInformation)keyCoderInfo, (KeySelector)keySelector);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"a", (Object)100L))));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"a", (Object)100L))));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        testHarness = this.createTestHarness((WindowingStrategy<Object, ?>)windowingStrategy, (DoFn)filterElementsEqualToCountFn, (WindowedValue.FullWindowedValueCoder)kvCoder, (WindowedValue.FullWindowedValueCoder)kvCoder, (Coder<?>)keyCoder, (TupleTag)outputTag, (TypeInformation)keyCoderInfo, (KeySelector)keySelector);
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"a", (Object)100L))));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"a", (Object)4L))));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"a", (Object)5L))));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"a", (Object)100L))));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"a", (Object)4L)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"a", (Object)5L))}));
        testHarness.close();
    }

    @Test
    public void nonKeyedParDoSideInputCheckpointing() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.sideInputCheckpointing(() -> {
            WindowedValue.FullWindowedValueCoder coder = WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)IntervalWindow.getCoder());
            TupleTag outputTag = new TupleTag("main-output");
            ImmutableMap sideInputMapping = ImmutableMap.builder().put((Object)1, this.view1).put((Object)2, this.view2).build();
            DoFnOperator doFnOperator = new DoFnOperator(new IdentityDoFn(), "stepName", (Coder)coder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)coder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())), WindowingStrategy.globalDefault(), (Map)sideInputMapping, (Collection)ImmutableList.of(this.view1, this.view2), (PipelineOptions)FlinkPipelineOptions.defaults(), null, null, DoFnSchemaInformation.create(), Collections.emptyMap());
            return new TwoInputStreamOperatorTestHarness((TwoInputStreamOperator)doFnOperator);
        });
    }

    @Test
    public void keyedParDoSideInputCheckpointing() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.sideInputCheckpointing(() -> {
            StringUtf8Coder keyCoder = StringUtf8Coder.of();
            WindowedValue.FullWindowedValueCoder coder = WindowedValue.getFullCoder((Coder)keyCoder, (Coder)IntervalWindow.getCoder());
            TupleTag outputTag = new TupleTag("main-output");
            KeySelector & Serializable keySelector = (KeySelector & Serializable)e -> FlinkKeyUtils.encodeKey((Object)((String)e.getValue()), (Coder)keyCoder);
            ImmutableMap sideInputMapping = ImmutableMap.builder().put((Object)1, this.view1).put((Object)2, this.view2).build();
            DoFnOperator doFnOperator = new DoFnOperator(new IdentityDoFn(), "stepName", (Coder)coder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)coder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())), WindowingStrategy.of((WindowFn)FixedWindows.of((Duration)Duration.millis((long)100L))), (Map)sideInputMapping, (Collection)ImmutableList.of(this.view1, this.view2), (PipelineOptions)FlinkPipelineOptions.defaults(), (Coder)keyCoder, (KeySelector)keySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
            return new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)doFnOperator, (KeySelector)keySelector, null, (TypeInformation)new CoderTypeInformation((Coder)FlinkKeyUtils.ByteBufferCoder.of(), (PipelineOptions)FlinkPipelineOptions.defaults()));
        });
    }

    void sideInputCheckpointing(@UnknownKeyFor @NonNull @Initialized TestHarnessFactory<@UnknownKeyFor @NonNull @Initialized TwoInputStreamOperatorTestHarness<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized String>, @UnknownKeyFor @NonNull @Initialized RawUnionValue, @UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized String>>> harnessFactory) throws @UnknownKeyFor @NonNull @Initialized Exception {
        TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, WindowedValue<String>> testHarness = harnessFactory.create();
        testHarness.open();
        IntervalWindow firstWindow = new IntervalWindow(new Instant(0L), new Instant(100L));
        IntervalWindow secondWindow = new IntervalWindow(new Instant(0L), new Instant(500L));
        testHarness.processElement2(new StreamRecord((Object)new RawUnionValue(1, this.valuesInWindow(PCollectionViewTesting.materializeValuesFor((PipelineOptions)this.view1.getPipeline().getOptions(), (PTransform)View.asIterable(), (Object[])new Object[]{"hello", "ciao"}), new Instant(0L), (BoundedWindow)firstWindow))));
        testHarness.processElement2(new StreamRecord((Object)new RawUnionValue(2, this.valuesInWindow(PCollectionViewTesting.materializeValuesFor((PipelineOptions)this.view2.getPipeline().getOptions(), (PTransform)View.asIterable(), (Object[])new Object[]{"foo", "bar"}), new Instant(0L), (BoundedWindow)secondWindow))));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness = harnessFactory.create();
        testHarness.initializeState(snapshot);
        testHarness.open();
        WindowedValue<String> helloElement = this.valueInWindow("Hello", new Instant(0L), (BoundedWindow)firstWindow);
        WindowedValue<String> worldElement = this.valueInWindow("World", new Instant(1000L), (BoundedWindow)firstWindow);
        testHarness.processElement1(new StreamRecord(helloElement));
        testHarness.processElement1(new StreamRecord(worldElement));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{helloElement, worldElement}));
        testHarness.close();
    }

    @Test
    public void nonKeyedParDoPushbackDataCheckpointing() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.pushbackDataCheckpointing(() -> {
            WindowedValue.FullWindowedValueCoder coder = WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)IntervalWindow.getCoder());
            TupleTag outputTag = new TupleTag("main-output");
            ImmutableMap sideInputMapping = ImmutableMap.builder().put((Object)1, this.view1).put((Object)2, this.view2).build();
            DoFnOperator doFnOperator = new DoFnOperator(new IdentityDoFn(), "stepName", (Coder)coder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)coder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())), WindowingStrategy.of((WindowFn)FixedWindows.of((Duration)Duration.millis((long)100L))), (Map)sideInputMapping, (Collection)ImmutableList.of(this.view1, this.view2), (PipelineOptions)FlinkPipelineOptions.defaults(), null, null, DoFnSchemaInformation.create(), Collections.emptyMap());
            return new TwoInputStreamOperatorTestHarness((TwoInputStreamOperator)doFnOperator);
        });
    }

    @Test
    public void keyedParDoPushbackDataCheckpointing() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.pushbackDataCheckpointing(() -> {
            StringUtf8Coder keyCoder = StringUtf8Coder.of();
            WindowedValue.FullWindowedValueCoder coder = WindowedValue.getFullCoder((Coder)keyCoder, (Coder)IntervalWindow.getCoder());
            TupleTag outputTag = new TupleTag("main-output");
            KeySelector & Serializable keySelector = (KeySelector & Serializable)e -> FlinkKeyUtils.encodeKey((Object)((String)e.getValue()), (Coder)keyCoder);
            ImmutableMap sideInputMapping = ImmutableMap.builder().put((Object)1, this.view1).put((Object)2, this.view2).build();
            DoFnOperator doFnOperator = new DoFnOperator(new IdentityDoFn(), "stepName", (Coder)coder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)coder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())), WindowingStrategy.of((WindowFn)FixedWindows.of((Duration)Duration.millis((long)100L))), (Map)sideInputMapping, (Collection)ImmutableList.of(this.view1, this.view2), (PipelineOptions)FlinkPipelineOptions.defaults(), (Coder)keyCoder, (KeySelector)keySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
            return new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)doFnOperator, (KeySelector)keySelector, null, (TypeInformation)new CoderTypeInformation((Coder)FlinkKeyUtils.ByteBufferCoder.of(), (PipelineOptions)FlinkPipelineOptions.defaults()));
        });
    }

    void pushbackDataCheckpointing(@UnknownKeyFor @NonNull @Initialized TestHarnessFactory<@UnknownKeyFor @NonNull @Initialized TwoInputStreamOperatorTestHarness<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized String>, @UnknownKeyFor @NonNull @Initialized RawUnionValue, @UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized String>>> harnessFactory) throws @UnknownKeyFor @NonNull @Initialized Exception {
        TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, WindowedValue<String>> testHarness = harnessFactory.create();
        testHarness.open();
        IntervalWindow firstWindow = new IntervalWindow(new Instant(0L), new Instant(100L));
        IntervalWindow secondWindow = new IntervalWindow(new Instant(0L), new Instant(500L));
        WindowedValue<String> helloElement = this.valueInWindow("Hello", new Instant(0L), (BoundedWindow)firstWindow);
        WindowedValue<String> worldElement = this.valueInWindow("World", new Instant(1000L), (BoundedWindow)firstWindow);
        testHarness.processElement1(new StreamRecord(helloElement));
        testHarness.processElement1(new StreamRecord(worldElement));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness = harnessFactory.create();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement2(new StreamRecord((Object)new RawUnionValue(1, this.valuesInWindow(PCollectionViewTesting.materializeValuesFor((PipelineOptions)this.view1.getPipeline().getOptions(), (PTransform)View.asIterable(), (Object[])new Object[]{"hello", "ciao"}), new Instant(0L), (BoundedWindow)firstWindow))));
        testHarness.processElement2(new StreamRecord((Object)new RawUnionValue(2, this.valuesInWindow(PCollectionViewTesting.materializeValuesFor((PipelineOptions)this.view2.getPipeline().getOptions(), (PTransform)View.asIterable(), (Object[])new Object[]{"foo", "bar"}), new Instant(0L), (BoundedWindow)secondWindow))));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)Matchers.containsInAnyOrder((Object[])new WindowedValue[]{helloElement, worldElement}));
        testHarness.close();
    }

    @Test
    public void testTimersRestore() throws @UnknownKeyFor @NonNull @Initialized Exception {
        final Instant timerTimestamp = new Instant(1000L);
        String outputMessage = "Timer fired";
        WindowingStrategy windowingStrategy = WindowingStrategy.of((WindowFn)FixedWindows.of((Duration)new Duration(10000L)));
        DoFn<Integer, String> fn = new DoFn<Integer, String>(){
            private static final @UnknownKeyFor @NonNull @Initialized String EVENT_TIMER_ID = "eventTimer";
            @DoFn.TimerId(value="eventTimer")
            private final @UnknownKeyFor @NonNull @Initialized TimerSpec eventTimer = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context, @DoFn.TimerId(value="eventTimer") @UnknownKeyFor @NonNull @Initialized Timer timer) {
                timer.set(timerTimestamp);
            }

            @DoFn.OnTimer(value="eventTimer")
            public void onEventTime(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized OnTimerContext context) {
                Assert.assertEquals((String)"Timer timestamp must match set timestamp.", (Object)timerTimestamp, (Object)context.timestamp());
                context.outputWithTimestamp((Object)"Timer fired", context.timestamp());
            }
        };
        VarIntCoder keyCoder = VarIntCoder.of();
        WindowedValue.FullWindowedValueCoder inputCoder = WindowedValue.getFullCoder((Coder)keyCoder, (Coder)windowingStrategy.getWindowFn().windowCoder());
        WindowedValue.FullWindowedValueCoder outputCoder = WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)windowingStrategy.getWindowFn().windowCoder());
        TupleTag outputTag = new TupleTag("main-output");
        CoderTypeSerializer outputSerializer = new CoderTypeSerializer((Coder)outputCoder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        CoderTypeInformation keyCoderInfo = new CoderTypeInformation((Coder)FlinkKeyUtils.ByteBufferCoder.of(), (PipelineOptions)FlinkPipelineOptions.defaults());
        KeySelector & Serializable keySelector = (KeySelector & Serializable)e -> FlinkKeyUtils.encodeKey((Object)((Integer)e.getValue()), (Coder)keyCoder);
        OneInputStreamOperatorTestHarness<WindowedValue<Integer>, WindowedValue<String>> testHarness = this.createTestHarness((WindowingStrategy<Object, ?>)windowingStrategy, (DoFn)fn, (WindowedValue.FullWindowedValueCoder)inputCoder, (WindowedValue.FullWindowedValueCoder)outputCoder, (Coder<?>)keyCoder, (TupleTag)outputTag, (TypeInformation)keyCoderInfo, (KeySelector)keySelector);
        testHarness.setup((TypeSerializer)outputSerializer);
        testHarness.open();
        testHarness.processWatermark(0L);
        IntervalWindow window1 = new IntervalWindow(new Instant(0L), (ReadableDuration)Duration.millis((long)10000L));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.of((Object)13, (Instant)new Instant(0L), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING)));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)Matchers.emptyIterable());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        testHarness = this.createTestHarness((WindowingStrategy<Object, ?>)windowingStrategy, (DoFn)fn, (WindowedValue.FullWindowedValueCoder)inputCoder, (WindowedValue.FullWindowedValueCoder)outputCoder, (Coder<?>)VarIntCoder.of(), (TupleTag)outputTag, (TypeInformation)keyCoderInfo, (KeySelector)keySelector);
        testHarness.setup((TypeSerializer)outputSerializer);
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(timerTimestamp.getMillis() + 1L);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.of((Object)"Timer fired", (Instant)new Instant((Object)timerTimestamp), (BoundedWindow)window1, (PaneInfo)PaneInfo.NO_FIRING)}));
        testHarness.close();
    }

    private <K, InT, OutT> @UnknownKeyFor @NonNull @Initialized OneInputStreamOperatorTestHarness<@UnknownKeyFor @NonNull @Initialized WindowedValue<InT>, @UnknownKeyFor @NonNull @Initialized WindowedValue<OutT>> createTestHarness(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized WindowingStrategy<@UnknownKeyFor @NonNull @Initialized Object, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> windowingStrategy, @UnknownKeyFor @NonNull @Initialized DoFn<InT, OutT> fn, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized WindowedValue.FullWindowedValueCoder<InT> inputCoder, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized WindowedValue.FullWindowedValueCoder<OutT> outputCoder, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> keyCoder, @UnknownKeyFor @NonNull @Initialized TupleTag<OutT> outputTag, @UnknownKeyFor @NonNull @Initialized TypeInformation<K> keyCoderInfo, @UnknownKeyFor @NonNull @Initialized KeySelector<@UnknownKeyFor @NonNull @Initialized WindowedValue<InT>, K> keySelector) throws @UnknownKeyFor @NonNull @Initialized Exception {
        DoFnOperator doFnOperator = new DoFnOperator(fn, "stepName", inputCoder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, outputCoder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults())), windowingStrategy, new HashMap(), Collections.emptyList(), (PipelineOptions)FlinkPipelineOptions.defaults(), keyCoder, keySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
        return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator, keySelector, keyCoderInfo);
    }

    @Test
    public void testBundle() throws @UnknownKeyFor @NonNull @Initialized Exception {
        WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = WindowedValue.getValueOnlyCoder((Coder)StringUtf8Coder.of());
        TupleTag outputTag = new TupleTag("main-output");
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setMaxBundleSize(Long.valueOf(2L));
        options.setMaxBundleTimeMills(Long.valueOf(10L));
        IdentityDoFn<String> doFn = new IdentityDoFn<String>(){

            @DoFn.FinishBundle
            public void finishBundle(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized FinishBundleContext context) {
                context.output((Object)"finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, (BoundedWindow)GlobalWindow.INSTANCE);
            }
        };
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions((PipelineOptions)options));
        DoFnOperator doFnOperator = new DoFnOperator((DoFn)doFn, "stepName", (Coder)windowedValueCoder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)outputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), (PipelineOptions)options, null, null, DoFnSchemaInformation.create(), Collections.emptyMap());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"a")));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"b")));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"c")));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"a"), WindowedValue.valueInGlobalWindow((Object)"b"), WindowedValue.valueInGlobalWindow((Object)"finishBundle"), WindowedValue.valueInGlobalWindow((Object)"c")}));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        PushedBackElementsHandler pushedBackElementsHandler = doFnOperator.outputManager.pushedBackElementsHandler;
        MatcherAssert.assertThat((Object)pushedBackElementsHandler, (Matcher)Matchers.instanceOf(NonKeyedPushedBackElementsHandler.class));
        List bufferedElements = pushedBackElementsHandler.getElements().collect(Collectors.toList());
        MatcherAssert.assertThat(bufferedElements, (Matcher)IsIterableContainingInOrder.contains((Object[])new KV[]{KV.of((Object)0, (Object)WindowedValue.valueInGlobalWindow((Object)"finishBundle"))}));
        testHarness.close();
        DoFnOperator newDoFnOperator = new DoFnOperator((DoFn)doFn, "stepName", (Coder)windowedValueCoder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)outputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), (PipelineOptions)options, null, null, DoFnSchemaInformation.create(), Collections.emptyMap());
        OneInputStreamOperatorTestHarness newHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)newDoFnOperator);
        newHarness.initializeState(snapshot);
        newHarness.open();
        newHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"d")));
        newHarness.setProcessingTime(10L);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(newHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"finishBundle"), WindowedValue.valueInGlobalWindow((Object)"d"), WindowedValue.valueInGlobalWindow((Object)"finishBundle")}));
        newHarness.close();
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(newHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"finishBundle"), WindowedValue.valueInGlobalWindow((Object)"d"), WindowedValue.valueInGlobalWindow((Object)"finishBundle")}));
        newDoFnOperator.dispose();
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(newHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"finishBundle"), WindowedValue.valueInGlobalWindow((Object)"d"), WindowedValue.valueInGlobalWindow((Object)"finishBundle")}));
    }

    @Test
    public void testBundleKeyed() throws @UnknownKeyFor @NonNull @Initialized Exception {
        StringUtf8Coder keyCoder = StringUtf8Coder.of();
        KvToByteBufferKeySelector keySelector = new KvToByteBufferKeySelector((Coder)keyCoder, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        KvCoder kvCoder = KvCoder.of((Coder)keyCoder, (Coder)StringUtf8Coder.of());
        WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = WindowedValue.getValueOnlyCoder((Coder)kvCoder);
        TupleTag outputTag = new TupleTag("main-output");
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setMaxBundleSize(Long.valueOf(2L));
        options.setMaxBundleTimeMills(Long.valueOf(10L));
        DoFn<KV<String, String>, String> doFn = new DoFn<KV<String, String>, String>(){

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext ctx) {
                ctx.output((Object)((String)((KV)ctx.element()).getValue()));
            }

            @DoFn.FinishBundle
            public void finishBundle(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized FinishBundleContext context) {
                context.output((Object)"finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, (BoundedWindow)GlobalWindow.INSTANCE);
            }
        };
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)WindowedValue.getFullCoder((Coder)kvCoder.getValueCoder(), (Coder)GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions((PipelineOptions)options));
        DoFnOperator doFnOperator = new DoFnOperator((DoFn)doFn, "stepName", (Coder)windowedValueCoder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)outputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), (PipelineOptions)options, (Coder)keyCoder, (KeySelector)keySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator, (KeySelector)keySelector, keySelector.getProducedType());
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key", (Object)"a"))));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key", (Object)"b"))));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key", (Object)"c"))));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"a"), WindowedValue.valueInGlobalWindow((Object)"b"), WindowedValue.valueInGlobalWindow((Object)"finishBundle"), WindowedValue.valueInGlobalWindow((Object)"c")}));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        PushedBackElementsHandler pushedBackElementsHandler = doFnOperator.outputManager.pushedBackElementsHandler;
        MatcherAssert.assertThat((Object)pushedBackElementsHandler, (Matcher)Matchers.instanceOf(NonKeyedPushedBackElementsHandler.class));
        List bufferedElements = pushedBackElementsHandler.getElements().collect(Collectors.toList());
        MatcherAssert.assertThat(bufferedElements, (Matcher)IsIterableContainingInOrder.contains((Object[])new KV[]{KV.of((Object)0, (Object)WindowedValue.valueInGlobalWindow((Object)"finishBundle"))}));
        testHarness.close();
        doFnOperator = new DoFnOperator((DoFn)doFn, "stepName", (Coder)windowedValueCoder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)outputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), (PipelineOptions)options, (Coder)keyCoder, (KeySelector)keySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
        testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator, (KeySelector)keySelector, keySelector.getProducedType());
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key", (Object)"d"))));
        testHarness.setProcessingTime(10L);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"finishBundle"), WindowedValue.valueInGlobalWindow((Object)"d"), WindowedValue.valueInGlobalWindow((Object)"finishBundle")}));
        testHarness.close();
    }

    @Test
    public void testCheckpointBufferingWithMultipleBundles() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setMaxBundleSize(Long.valueOf(10L));
        options.setCheckpointingInterval(Long.valueOf(1L));
        TupleTag outputTag = new TupleTag("main-output");
        StringUtf8Coder coder = StringUtf8Coder.of();
        WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = WindowedValue.getValueOnlyCoder((Coder)coder);
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions((PipelineOptions)options));
        Supplier<DoFnOperator> doFnOperatorSupplier = () -> new DoFnOperator(new IdentityDoFn(), "stepName", (Coder)windowedValueCoder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)outputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), (PipelineOptions)options, null, null, DoFnSchemaInformation.create(), Collections.emptyMap());
        DoFnOperator doFnOperator = doFnOperatorSupplier.get();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"regular element")));
        doFnOperator.setBundleFinishedCallback(() -> {
            try {
                doFnOperator.setBundleFinishedCallback(null);
                testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"trigger another bundle")));
                doFnOperator.invokeFinishBundle();
                testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"check that the previous element is not flushed")));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"regular element")}));
        testHarness.processWatermark(Long.MAX_VALUE);
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"regular element"), WindowedValue.valueInGlobalWindow((Object)"trigger another bundle"), WindowedValue.valueInGlobalWindow((Object)"check that the previous element is not flushed")}));
        testHarness.close();
        OneInputStreamOperatorTestHarness testHarness2 = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperatorSupplier.get());
        testHarness2.initializeState(snapshot);
        testHarness2.open();
        testHarness2.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"after restore")));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness2.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"trigger another bundle"), WindowedValue.valueInGlobalWindow((Object)"check that the previous element is not flushed"), WindowedValue.valueInGlobalWindow((Object)"after restore")}));
    }

    @Test
    public void testExactlyOnceBuffering() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setMaxBundleSize(Long.valueOf(2L));
        options.setCheckpointingInterval(Long.valueOf(1L));
        TupleTag outputTag = new TupleTag("main-output");
        WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = WindowedValue.getValueOnlyCoder((Coder)StringUtf8Coder.of());
        this.numStartBundleCalled = 0;
        DoFn<String, String> doFn = new DoFn<String, String>(){

            @DoFn.StartBundle
            public void startBundle(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized StartBundleContext context) {
                DoFnOperatorTest.this.numStartBundleCalled += 1;
            }

            @DoFn.ProcessElement
            @DoFn.RequiresStableInput
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context) {
                context.output((Object)((String)context.element()));
            }

            @DoFn.FinishBundle
            public void finishBundle(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized FinishBundleContext context) {
                context.output((Object)"finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, (BoundedWindow)GlobalWindow.INSTANCE);
            }
        };
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions((PipelineOptions)options));
        Supplier<DoFnOperator> doFnOperatorSupplier = () -> DoFnOperatorTest.lambda$testExactlyOnceBuffering$8((DoFn)doFn, windowedValueCoder, outputTag, outputManagerFactory, options);
        DoFnOperator doFnOperator = doFnOperatorSupplier.get();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"a")));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"b")));
        MatcherAssert.assertThat((Object)Iterables.size((Iterable)testHarness.getOutput()), (Matcher)Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)this.numStartBundleCalled, (Matcher)Matchers.is((Object)0));
        OperatorSubtaskState backup = testHarness.snapshot(0L, 0L);
        doFnOperator.notifyCheckpointComplete(0L);
        MatcherAssert.assertThat((Object)this.numStartBundleCalled, (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"a"), WindowedValue.valueInGlobalWindow((Object)"b"), WindowedValue.valueInGlobalWindow((Object)"finishBundle")}));
        doFnOperator = doFnOperatorSupplier.get();
        testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator);
        testHarness.initializeState(backup);
        testHarness.open();
        doFnOperator.notifyCheckpointComplete(0L);
        MatcherAssert.assertThat((Object)this.numStartBundleCalled, (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"a"), WindowedValue.valueInGlobalWindow((Object)"b"), WindowedValue.valueInGlobalWindow((Object)"finishBundle")}));
        doFnOperator.notifyCheckpointComplete(1L);
        MatcherAssert.assertThat((Object)this.numStartBundleCalled, (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)"a"), WindowedValue.valueInGlobalWindow((Object)"b"), WindowedValue.valueInGlobalWindow((Object)"finishBundle")}));
    }

    @Test
    public void testExactlyOnceBufferingKeyed() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setMaxBundleSize(Long.valueOf(2L));
        options.setCheckpointingInterval(Long.valueOf(1L));
        TupleTag outputTag = new TupleTag("main-output");
        StringUtf8Coder keyCoder = StringUtf8Coder.of();
        KvToByteBufferKeySelector keySelector = new KvToByteBufferKeySelector((Coder)keyCoder, new SerializablePipelineOptions((PipelineOptions)options));
        KvCoder kvCoder = KvCoder.of((Coder)keyCoder, (Coder)StringUtf8Coder.of());
        WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = WindowedValue.getValueOnlyCoder((Coder)kvCoder);
        DoFn<KV<String, String>, KV<String, String>> doFn = new DoFn<KV<String, String>, KV<String, String>>(){

            @DoFn.StartBundle
            public void startBundle() {
                DoFnOperatorTest.this.numStartBundleCalled++;
            }

            @DoFn.ProcessElement
            @DoFn.RequiresStableInput
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context) {
                context.output((Object)((KV)context.element()));
            }

            @DoFn.FinishBundle
            public void finishBundle(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized FinishBundleContext context) {
                context.output((Object)KV.of((Object)"key3", (Object)"finishBundle"), BoundedWindow.TIMESTAMP_MIN_VALUE, (BoundedWindow)GlobalWindow.INSTANCE);
            }
        };
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)WindowedValue.getFullCoder((Coder)kvCoder, (Coder)GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions((PipelineOptions)options));
        Supplier<DoFnOperator> doFnOperatorSupplier = () -> DoFnOperatorTest.lambda$testExactlyOnceBufferingKeyed$9((DoFn)doFn, windowedValueCoder, outputTag, outputManagerFactory, options, keyCoder, keySelector);
        DoFnOperator doFnOperator = doFnOperatorSupplier.get();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator, (KeySelector)keySelector, keySelector.getProducedType());
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key", (Object)"a"))));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key", (Object)"b"))));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key2", (Object)"c"))));
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key2", (Object)"d"))));
        MatcherAssert.assertThat((Object)Iterables.size((Iterable)testHarness.getOutput()), (Matcher)Matchers.is((Object)0));
        OperatorSubtaskState backup = testHarness.snapshot(0L, 0L);
        doFnOperator.notifyCheckpointComplete(0L);
        MatcherAssert.assertThat((Object)this.numStartBundleCalled, (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key", (Object)"a")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key", (Object)"b")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key2", (Object)"c")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key2", (Object)"d")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key3", (Object)"finishBundle"))}));
        doFnOperator = doFnOperatorSupplier.get();
        testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator, (KeySelector)keySelector, keySelector.getProducedType());
        testHarness.initializeState(backup);
        testHarness.open();
        doFnOperator.notifyCheckpointComplete(0L);
        MatcherAssert.assertThat((Object)this.numStartBundleCalled, (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key", (Object)"a")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key", (Object)"b")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key2", (Object)"c")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key2", (Object)"d")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key3", (Object)"finishBundle"))}));
        doFnOperator.notifyCheckpointComplete(1L);
        MatcherAssert.assertThat((Object)this.numStartBundleCalled, (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key", (Object)"a")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key", (Object)"b")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key2", (Object)"c")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key2", (Object)"d")), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"key3", (Object)"finishBundle"))}));
    }

    @Test(expected=IllegalStateException.class)
    public void testFailOnRequiresStableInputAndDisabledCheckpointing() {
        TupleTag outputTag = new TupleTag("main-output");
        StringUtf8Coder keyCoder = StringUtf8Coder.of();
        KvToByteBufferKeySelector keySelector = new KvToByteBufferKeySelector((Coder)keyCoder, null);
        KvCoder kvCoder = KvCoder.of((Coder)keyCoder, (Coder)StringUtf8Coder.of());
        WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = WindowedValue.getValueOnlyCoder((Coder)kvCoder);
        DoFn<KV<String, String>, KV<String, String>> doFn = new DoFn<KV<String, String>, KV<String, String>>(){

            @DoFn.ProcessElement
            @DoFn.RequiresStableInput
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context) {
                context.output((Object)((KV)context.element()));
            }
        };
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)WindowedValue.getFullCoder((Coder)kvCoder, (Coder)GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions((PipelineOptions)options));
        options.setCheckpointingInterval(Long.valueOf(-1L));
        new DoFnOperator((DoFn)doFn, "stepName", (Coder)windowedValueCoder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)outputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), (PipelineOptions)options, (Coder)keyCoder, (KeySelector)keySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
    }

    @Test
    public void testBundleProcessingExceptionIsFatalDuringCheckpointing() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setMaxBundleSize(Long.valueOf(10L));
        options.setCheckpointingInterval(Long.valueOf(1L));
        TupleTag outputTag = new TupleTag("main-output");
        StringUtf8Coder coder = StringUtf8Coder.of();
        WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = WindowedValue.getValueOnlyCoder((Coder)coder);
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions((PipelineOptions)options));
        DoFnOperator doFnOperator = new DoFnOperator((DoFn)new IdentityDoFn<String>(){

            @DoFn.FinishBundle
            public void finishBundle() {
                throw new RuntimeException("something went wrong here");
            }
        }, "stepName", (Coder)windowedValueCoder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)outputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), (PipelineOptions)options, null, null, DoFnSchemaInformation.create(), Collections.emptyMap());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)doFnOperator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)"regular element")));
        Assert.assertThrows(Error.class, () -> testHarness.snapshot(0L, 0L));
    }

    @Test
    public void testAccumulatorRegistrationOnOperatorClose() throws @UnknownKeyFor @NonNull @Initialized Exception {
        DoFnOperator<String, String> doFnOperator = DoFnOperatorTest.getOperatorForCleanupInspection();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(doFnOperator);
        testHarness.open();
        String metricContainerFieldName = "flinkMetricContainer";
        FlinkMetricContainer monitoredContainer = (FlinkMetricContainer)Mockito.spy((Object)((FlinkMetricContainer)Whitebox.getInternalState(doFnOperator, (String)metricContainerFieldName)));
        Whitebox.setInternalState(doFnOperator, (String)metricContainerFieldName, (Object)monitoredContainer);
        testHarness.close();
        doFnOperator.dispose();
        ((FlinkMetricContainer)Mockito.verify((Object)monitoredContainer, (VerificationMode)Mockito.times((int)2))).registerMetricsForPipelineResult();
    }

    @Test
    public void testRemoveCachedClassReferences() throws @UnknownKeyFor @NonNull @Initialized Exception {
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(DoFnOperatorTest.getOperatorForCleanupInspection());
        LRUMap typeCache = (LRUMap)Whitebox.getInternalState((Object)TypeFactory.defaultInstance(), (String)"_typeCache");
        MatcherAssert.assertThat((Object)typeCache.size(), (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0)));
        testHarness.open();
        testHarness.close();
        MatcherAssert.assertThat((Object)typeCache.size(), (Matcher)Matchers.is((Object)0));
    }

    private static @UnknownKeyFor @NonNull @Initialized DoFnOperator<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> getOperatorForCleanupInspection() {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setParallelism(Integer.valueOf(4));
        TupleTag outputTag = new TupleTag("main-output");
        WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = WindowedValue.getValueOnlyCoder((Coder)StringUtf8Coder.of());
        IdentityDoFn<String> doFn = new IdentityDoFn<String>(){

            @DoFn.FinishBundle
            public void finishBundle(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized FinishBundleContext context) {
                context.output((Object)"finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, (BoundedWindow)GlobalWindow.INSTANCE);
            }
        };
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(outputTag, (Coder)WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE), new SerializablePipelineOptions((PipelineOptions)options));
        return new DoFnOperator((DoFn)doFn, "stepName", (Coder)windowedValueCoder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)outputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), (PipelineOptions)options, null, null, DoFnSchemaInformation.create(), Collections.emptyMap());
    }

    private @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized String>> stripStreamRecord(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @KeyForBottom @Nullable @Initialized @NonNull @Initialized ?> input) {
        return FluentIterable.from(input).filter(o -> o instanceof StreamRecord).transform((Function)new Function<Object, WindowedValue<String>>(){

            public @Nullable @UnknownKeyFor @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized String> apply(@Nullable @UnknownKeyFor @Initialized Object o) {
                if (o instanceof StreamRecord) {
                    return (WindowedValue)((StreamRecord)o).getValue();
                }
                throw new RuntimeException("unreachable");
            }
        });
    }

    private /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @KeyForBottom @Nullable @Initialized @NonNull @Initialized ?>> valuesInWindow(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @KeyForBottom @Nullable @Initialized @NonNull @Initialized ?> values, @UnknownKeyFor @NonNull @Initialized Instant timestamp, @UnknownKeyFor @NonNull @Initialized BoundedWindow window) {
        return WindowedValue.of(values, (Instant)timestamp, (BoundedWindow)window, (PaneInfo)PaneInfo.NO_FIRING);
    }

    private <T> @UnknownKeyFor @NonNull @Initialized WindowedValue<T> valueInWindow(T value, @UnknownKeyFor @NonNull @Initialized Instant timestamp, @UnknownKeyFor @NonNull @Initialized BoundedWindow window) {
        return WindowedValue.of(value, (Instant)timestamp, (BoundedWindow)window, (PaneInfo)PaneInfo.NO_FIRING);
    }

    private static /* synthetic */ DoFnOperator lambda$testExactlyOnceBufferingKeyed$9(DoFn doFn, WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder, TupleTag outputTag, DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory, FlinkPipelineOptions options, StringUtf8Coder keyCoder, KvToByteBufferKeySelector keySelector) {
        return new DoFnOperator(doFn, "stepName", (Coder)windowedValueCoder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)outputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), (PipelineOptions)options, (Coder)keyCoder, (KeySelector)keySelector, DoFnSchemaInformation.create(), Collections.emptyMap());
    }

    private static /* synthetic */ DoFnOperator lambda$testExactlyOnceBuffering$8(DoFn doFn, WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder, TupleTag outputTag, DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory, FlinkPipelineOptions options) {
        return new DoFnOperator(doFn, "stepName", (Coder)windowedValueCoder, Collections.emptyMap(), outputTag, Collections.emptyList(), (DoFnOperator.OutputManagerFactory)outputManagerFactory, WindowingStrategy.globalDefault(), new HashMap(), Collections.emptyList(), (PipelineOptions)options, null, null, DoFnSchemaInformation.create(), Collections.emptyMap());
    }

    private static /* synthetic */ ByteBuffer lambda$testSideInputs$2e46429e$1(Coder keyCoder, WindowedValue value) throws Exception {
        return FlinkKeyUtils.encodeKey((Object)((String)value.getValue()), (Coder)keyCoder);
    }

    private static interface TestHarnessFactory<@UnknownKeyFor T> {
        public T create() throws @UnknownKeyFor @NonNull @Initialized Exception;
    }

    private static class IdentityDoFn<@UnknownKeyFor T>
    extends DoFn<T, T> {
        private IdentityDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) {
            c.output(c.element());
        }
    }

    private static class MultiOutputDoFn
    extends DoFn<String, String> {
        private final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized String> additionalOutput1;
        private final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized String> additionalOutput2;

        public MultiOutputDoFn(@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized String> additionalOutput1, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized String> additionalOutput2) {
            this.additionalOutput1 = additionalOutput1;
            this.additionalOutput2 = additionalOutput2;
        }

        @DoFn.ProcessElement
        public void processElement(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c) {
            if ("one".equals(c.element())) {
                c.output(this.additionalOutput1, (Object)"extra: one");
            } else if ("two".equals(c.element())) {
                c.output(this.additionalOutput2, (Object)"extra: two");
            } else {
                c.output((Object)("got: " + (String)c.element()));
                c.output(this.additionalOutput1, (Object)("got: " + (String)c.element()));
                c.output(this.additionalOutput2, (Object)("got: " + (String)c.element()));
            }
        }
    }
}

