/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.function.BiConsumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.SerializationUtils;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.mutable.MutableObject;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.streaming.FlinkStateInternalsTest;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContextFactory;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils;
import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper;
import org.apache.beam.runners.fnexecution.control.BundleFinalizationHandler;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.NoopLock;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.OutputTag;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.powermock.reflect.Whitebox;

@RunWith(value=JUnit4.class)
public class ExecutableStageDoFnOperatorTest {
    @Rule
    public @UnknownKeyFor @NonNull @Initialized ExpectedException thrown = ExpectedException.none();
    @Mock
    private @UnknownKeyFor @NonNull @Initialized RuntimeContext runtimeContext;
    @Mock
    private @UnknownKeyFor @NonNull @Initialized DistributedCache distributedCache;
    @Mock
    private @UnknownKeyFor @NonNull @Initialized ExecutableStageContext stageContext;
    @Mock
    private @UnknownKeyFor @NonNull @Initialized StageBundleFactory stageBundleFactory;
    @Mock
    private @UnknownKeyFor @NonNull @Initialized StateRequestHandler stateRequestHandler;
    @Mock
    private // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;
    @Mock
    private @UnknownKeyFor @NonNull @Initialized InstructionRequestHandler instructionRequestHandler;
    private final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.ExecutableStagePayload stagePayload = RunnerApi.ExecutableStagePayload.newBuilder().setInput("input").setComponents(RunnerApi.Components.newBuilder().putPcollections("input", RunnerApi.PCollection.getDefaultInstance()).build()).build();
    private final @UnknownKeyFor @NonNull @Initialized String stateId = "userState";
    private final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.ExecutableStagePayload stagePayloadWithUserState = this.stagePayload.toBuilder().setComponents(this.stagePayload.getComponents().toBuilder().putTransforms("transform", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:pardo:v1").build()).putInputs("input", "input").build()).build()).addUserStates(RunnerApi.ExecutableStagePayload.UserStateId.newBuilder().setLocalName("userState").setTransformId("transform").build()).build();
    private final @UnknownKeyFor @NonNull @Initialized JobInfo jobInfo = JobInfo.create((String)"job-id", (String)"job-name", (String)"retrieval-token", (Struct)Struct.getDefaultInstance());

    @Before
    public void setUpMocks() {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)this.runtimeContext.getDistributedCache()).thenReturn((Object)this.distributedCache);
        Mockito.when((Object)this.stageContext.getStageBundleFactory((ExecutableStage)Matchers.any())).thenReturn((Object)this.stageBundleFactory);
        Mockito.when((Object)this.processBundleDescriptor.getTimerSpecs()).thenReturn(Collections.emptyMap());
        Mockito.when((Object)this.processBundleDescriptor.getBagUserStateSpecs()).thenReturn(Collections.emptyMap());
        Mockito.when((Object)this.stageBundleFactory.getProcessBundleDescriptor()).thenReturn((Object)this.processBundleDescriptor);
        Mockito.when((Object)this.stageBundleFactory.getInstructionRequestHandler()).thenReturn((Object)this.instructionRequestHandler);
    }

    @Test
    public void sdkErrorsSurfaceOnClose() throws @UnknownKeyFor @NonNull @Initialized Exception {
        TupleTag mainOutput = new TupleTag("main-output");
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, (Coder)VoidCoder.of(), new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        ExecutableStageDoFnOperator operator = this.getOperator((TupleTag<Integer>)mainOutput, Collections.emptyList(), (DoFnOperator.MultiOutputOutputManagerFactory<Integer>)outputManagerFactory);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator);
        testHarness.open();
        RemoteBundle bundle = (RemoteBundle)Mockito.mock(RemoteBundle.class);
        Mockito.when((Object)this.stageBundleFactory.getBundle((OutputReceiverFactory)Matchers.any(), (TimerReceiverFactory)Matchers.any(), (StateRequestHandler)Matchers.any(), (BundleProgressHandler)Matchers.any(), (BundleFinalizationHandler)Matchers.any())).thenReturn((Object)bundle);
        FnDataReceiver receiver = (FnDataReceiver)Mockito.mock(FnDataReceiver.class);
        Mockito.when((Object)bundle.getInputReceivers()).thenReturn((Object)ImmutableMap.of((Object)"input", (Object)receiver));
        RuntimeException expected = new RuntimeException(new Exception());
        ((RemoteBundle)Mockito.doThrow((Throwable[])new Throwable[]{expected}).when((Object)bundle)).close();
        this.thrown.expectCause(org.hamcrest.Matchers.is((Object)expected));
        operator.processElement(new StreamRecord((Object)WindowedValue.valueInGlobalWindow((Object)0)));
        testHarness.close();
    }

    @Test
    public void expectedInputsAreSent() throws @UnknownKeyFor @NonNull @Initialized Exception {
        TupleTag mainOutput = new TupleTag("main-output");
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, (Coder)VoidCoder.of(), new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        ExecutableStageDoFnOperator operator = this.getOperator((TupleTag<Integer>)mainOutput, Collections.emptyList(), (DoFnOperator.MultiOutputOutputManagerFactory<Integer>)outputManagerFactory);
        RemoteBundle bundle = (RemoteBundle)Mockito.mock(RemoteBundle.class);
        Mockito.when((Object)this.stageBundleFactory.getBundle((OutputReceiverFactory)Matchers.any(), (TimerReceiverFactory)Matchers.any(), (StateRequestHandler)Matchers.any(), (BundleProgressHandler)Matchers.any(), (BundleFinalizationHandler)Matchers.any())).thenReturn((Object)bundle);
        FnDataReceiver receiver = (FnDataReceiver)Mockito.mock(FnDataReceiver.class);
        Mockito.when((Object)bundle.getInputReceivers()).thenReturn((Object)ImmutableMap.of((Object)"input", (Object)receiver));
        WindowedValue one = WindowedValue.valueInGlobalWindow((Object)1);
        WindowedValue two = WindowedValue.valueInGlobalWindow((Object)2);
        WindowedValue three = WindowedValue.valueInGlobalWindow((Object)3);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator);
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)one));
        testHarness.processElement(new StreamRecord((Object)two));
        testHarness.processElement(new StreamRecord((Object)three));
        ((FnDataReceiver)Mockito.verify((Object)receiver)).accept((Object)one);
        ((FnDataReceiver)Mockito.verify((Object)receiver)).accept((Object)two);
        ((FnDataReceiver)Mockito.verify((Object)receiver)).accept((Object)three);
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{receiver});
        testHarness.close();
    }

    @Test
    public void outputsAreTaggedCorrectly() throws @UnknownKeyFor @NonNull @Initialized Exception {
        WindowedValue.ValueOnlyWindowedValueCoder coder = WindowedValue.getValueOnlyCoder((Coder)VarIntCoder.of());
        final TupleTag mainOutput = new TupleTag("main-output");
        final TupleTag additionalOutput1 = new TupleTag("output-1");
        final TupleTag additionalOutput2 = new TupleTag("output-2");
        ImmutableMap tagsToOutputTags = ImmutableMap.builder().put((Object)additionalOutput1, (Object)new OutputTag<WindowedValue<String>>(additionalOutput1.getId()){}).put((Object)additionalOutput2, (Object)new OutputTag<WindowedValue<String>>(additionalOutput2.getId()){}).build();
        ImmutableMap tagsToCoders = ImmutableMap.builder().put((Object)mainOutput, (Object)coder).put((Object)additionalOutput1, (Object)coder).put((Object)additionalOutput2, (Object)coder).build();
        ImmutableMap tagsToIds = ImmutableMap.builder().put((Object)mainOutput, (Object)0).put((Object)additionalOutput1, (Object)1).put((Object)additionalOutput2, (Object)2).build();
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, (Map)tagsToOutputTags, (Map)tagsToCoders, (Map)tagsToIds, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        WindowedValue zero = WindowedValue.valueInGlobalWindow((Object)0);
        final WindowedValue three = WindowedValue.valueInGlobalWindow((Object)3);
        final WindowedValue four = WindowedValue.valueInGlobalWindow((Object)4);
        final WindowedValue five = WindowedValue.valueInGlobalWindow((Object)5);
        StageBundleFactory stageBundleFactory = new StageBundleFactory(){
            private @UnknownKeyFor @NonNull @Initialized boolean onceEmitted;

            public @UnknownKeyFor @NonNull @Initialized RemoteBundle getBundle(final @UnknownKeyFor @NonNull @Initialized OutputReceiverFactory receiverFactory, @UnknownKeyFor @NonNull @Initialized TimerReceiverFactory timerReceiverFactory, @UnknownKeyFor @NonNull @Initialized StateRequestHandler stateRequestHandler, @UnknownKeyFor @NonNull @Initialized BundleProgressHandler progressHandler, @UnknownKeyFor @NonNull @Initialized BundleFinalizationHandler finalizationHandler) {
                return new RemoteBundle(){

                    public @UnknownKeyFor @NonNull @Initialized String getId() {
                        return "bundle-id";
                    }

                    public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized FnDataReceiver> getInputReceivers() {
                        return ImmutableMap.of((Object)"input", input -> {});
                    }

                    public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String>, @UnknownKeyFor @NonNull @Initialized FnDataReceiver<@UnknownKeyFor @NonNull @Initialized Timer>> getTimerReceivers() {
                        return Collections.emptyMap();
                    }

                    public void requestProgress() {
                        throw new UnsupportedOperationException();
                    }

                    public void split(@UnknownKeyFor @NonNull @Initialized double fractionOfRemainder) {
                        throw new UnsupportedOperationException();
                    }

                    public void close() throws @UnknownKeyFor @NonNull @Initialized Exception {
                        if (onceEmitted) {
                            return;
                        }
                        receiverFactory.create(mainOutput.getId()).accept((Object)three);
                        receiverFactory.create(additionalOutput1.getId()).accept((Object)four);
                        receiverFactory.create(additionalOutput2.getId()).accept((Object)five);
                        onceEmitted = true;
                    }
                };
            }

            public // Could not load outer class - annotation placement on inner may be incorrect
             @UnknownKeyFor @NonNull @Initialized ProcessBundleDescriptors.ExecutableProcessBundleDescriptor getProcessBundleDescriptor() {
                return ExecutableStageDoFnOperatorTest.this.processBundleDescriptor;
            }

            public @UnknownKeyFor @NonNull @Initialized InstructionRequestHandler getInstructionRequestHandler() {
                return null;
            }

            public void close() {
            }
        };
        Mockito.when((Object)this.stageContext.getStageBundleFactory((ExecutableStage)Matchers.any())).thenReturn((Object)stageBundleFactory);
        ExecutableStageDoFnOperator operator = this.getOperator((TupleTag<Integer>)mainOutput, (List<TupleTag<?>>)ImmutableList.of((Object)additionalOutput1, (Object)additionalOutput2), (DoFnOperator.MultiOutputOutputManagerFactory<Integer>)outputManagerFactory);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator);
        long watermark = testHarness.getCurrentWatermark() + 1L;
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)zero));
        testHarness.processWatermark(watermark);
        testHarness.processWatermark(++watermark);
        Assert.assertEquals((long)watermark, (long)testHarness.getCurrentWatermark());
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        testHarness.close();
        MatcherAssert.assertThat(StreamRecordStripper.stripStreamRecordFromWindowedValue(testHarness.getOutput()), (Matcher)IsIterableContainingInOrder.contains((Object[])new WindowedValue[]{three}));
        MatcherAssert.assertThat((Object)testHarness.getSideOutput((OutputTag)tagsToOutputTags.get((Object)additionalOutput1)), (Matcher)IsIterableContainingInOrder.contains((Object[])new StreamRecord[]{new StreamRecord((Object)four)}));
        MatcherAssert.assertThat((Object)testHarness.getSideOutput((OutputTag)tagsToOutputTags.get((Object)additionalOutput2)), (Matcher)IsIterableContainingInOrder.contains((Object[])new StreamRecord[]{new StreamRecord((Object)five)}));
    }

    @Test
    public void testWatermarkHandling() throws @UnknownKeyFor @NonNull @Initialized Exception {
        TupleTag mainOutput = new TupleTag("main-output");
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, (Coder)VoidCoder.of(), new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        ExecutableStageDoFnOperator operator = this.getOperator((TupleTag<Integer>)mainOutput, Collections.emptyList(), (DoFnOperator.MultiOutputOutputManagerFactory<Integer>)outputManagerFactory, WindowingStrategy.of((WindowFn)FixedWindows.of((Duration)Duration.millis((long)10L))), (Coder)StringUtf8Coder.of(), (Coder)WindowedValue.getFullCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of()), (Coder)IntervalWindow.getCoder()));
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector & Serializable)val -> (String)((KV)val.getValue()).getKey(), (TypeInformation)new CoderTypeInformation((Coder)StringUtf8Coder.of(), (PipelineOptions)FlinkPipelineOptions.defaults()));
        RemoteBundle bundle = (RemoteBundle)Mockito.mock(RemoteBundle.class);
        Mockito.when((Object)bundle.getInputReceivers()).thenReturn((Object)ImmutableMap.builder().put((Object)"input", (Object)((FnDataReceiver)Mockito.mock(FnDataReceiver.class))).build());
        Mockito.when((Object)bundle.getTimerReceivers()).thenReturn((Object)ImmutableMap.builder().put((Object)KV.of((Object)"transform", (Object)"timer"), (Object)((FnDataReceiver)Mockito.mock(FnDataReceiver.class))).put((Object)KV.of((Object)"transform", (Object)"timer2"), Mockito.mock(FnDataReceiver.class)).put((Object)KV.of((Object)"transform", (Object)"timer3"), Mockito.mock(FnDataReceiver.class)).build());
        Mockito.when((Object)this.stageBundleFactory.getBundle((OutputReceiverFactory)Matchers.any(), (TimerReceiverFactory)Matchers.any(), (StateRequestHandler)Matchers.any(), (BundleProgressHandler)Matchers.any(), (BundleFinalizationHandler)Matchers.any())).thenReturn((Object)bundle);
        testHarness.open();
        MatcherAssert.assertThat((Object)operator.getCurrentOutputWatermark(), (Matcher)org.hamcrest.Matchers.is((Object)BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()));
        testHarness.processWatermark(0L);
        MatcherAssert.assertThat((Object)operator.getCurrentOutputWatermark(), (Matcher)org.hamcrest.Matchers.is((Object)0L));
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), new Instant(9L));
        WindowedValue windowedValue = WindowedValue.of((Object)KV.of((Object)"one", (Object)1), (Instant)Instant.now(), (BoundedWindow)intervalWindow, (PaneInfo)PaneInfo.NO_FIRING);
        testHarness.processElement(new StreamRecord((Object)windowedValue));
        testHarness.processWatermark(1L);
        MatcherAssert.assertThat((Object)operator.getEffectiveInputWatermark(), (Matcher)org.hamcrest.Matchers.is((Object)1L));
        MatcherAssert.assertThat((Object)operator.getCurrentOutputWatermark(), (Matcher)org.hamcrest.Matchers.is((Object)0L));
        operator.invokeFinishBundle();
        MatcherAssert.assertThat((Object)operator.getCurrentOutputWatermark(), (Matcher)org.hamcrest.Matchers.is((Object)1L));
        testHarness.processWatermark(2L);
        MatcherAssert.assertThat((Object)operator.getEffectiveInputWatermark(), (Matcher)org.hamcrest.Matchers.is((Object)2L));
        MatcherAssert.assertThat((Object)operator.getCurrentOutputWatermark(), (Matcher)org.hamcrest.Matchers.is((Object)2L));
        testHarness.processElement(new StreamRecord((Object)windowedValue));
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)org.hamcrest.Matchers.is((Object)1));
        Instant timerTarget = new Instant(5L);
        Instant timerTarget2 = new Instant(6L);
        operator.getLockToAcquireForStateAccessDuringBundles().lock();
        BiConsumer<String, Instant> timerConsumer = (timerId, timestamp) -> operator.setTimer(Timer.of((Object)((String)((KV)windowedValue.getValue()).getKey()), (String)"", (Collection)windowedValue.getWindows(), (Instant)timestamp, (Instant)timestamp, (PaneInfo)PaneInfo.NO_FIRING), TimerInternals.TimerData.of((String)TimerReceiverFactory.encodeToTimerDataTimerId((String)"transform", (String)timerId), (String)"", (StateNamespace)StateNamespaces.window((Coder)IntervalWindow.getCoder(), (BoundedWindow)intervalWindow), (Instant)timestamp, (Instant)timestamp, (TimeDomain)TimeDomain.EVENT_TIME));
        timerConsumer.accept("timer", timerTarget);
        timerConsumer.accept("timer2", timerTarget2);
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)org.hamcrest.Matchers.is((Object)3));
        long targetWatermark = timerTarget.getMillis() + 100L;
        testHarness.processWatermark(targetWatermark);
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)org.hamcrest.Matchers.is((Object)3));
        MatcherAssert.assertThat((Object)operator.getCurrentOutputWatermark(), (Matcher)org.hamcrest.Matchers.is((Object)2L));
        operator.invokeFinishBundle();
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)org.hamcrest.Matchers.is((Object)3));
        testHarness.setProcessingTime(testHarness.getProcessingTime() + 1L);
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)org.hamcrest.Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)operator.getCurrentOutputWatermark(), (Matcher)org.hamcrest.Matchers.is((Object)5L));
        operator.invokeFinishBundle();
        MatcherAssert.assertThat((Object)operator.getCurrentOutputWatermark(), (Matcher)org.hamcrest.Matchers.is((Object)targetWatermark));
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)org.hamcrest.Matchers.is((Object)0));
        testHarness.processElement(new StreamRecord((Object)windowedValue));
        timerConsumer.accept("timer3", new Instant(targetWatermark));
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)org.hamcrest.Matchers.is((Object)1));
        testHarness.close();
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)org.hamcrest.Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)operator.getCurrentOutputWatermark(), (Matcher)org.hamcrest.Matchers.is((Object)Long.MAX_VALUE));
    }

    @Test
    public void testStageBundleClosed() throws @UnknownKeyFor @NonNull @Initialized Exception {
        TupleTag mainOutput = new TupleTag("main-output");
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, (Coder)VoidCoder.of(), new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        ExecutableStageDoFnOperator operator = this.getOperator((TupleTag<Integer>)mainOutput, Collections.emptyList(), (DoFnOperator.MultiOutputOutputManagerFactory<Integer>)outputManagerFactory);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator);
        RemoteBundle bundle = (RemoteBundle)Mockito.mock(RemoteBundle.class);
        Mockito.when((Object)bundle.getInputReceivers()).thenReturn((Object)ImmutableMap.builder().put((Object)"input", (Object)((FnDataReceiver)Mockito.mock(FnDataReceiver.class))).build());
        Mockito.when((Object)this.stageBundleFactory.getBundle((OutputReceiverFactory)Matchers.any(), (TimerReceiverFactory)Matchers.any(), (StateRequestHandler)Matchers.any(), (BundleProgressHandler)Matchers.any(), (BundleFinalizationHandler)Matchers.any())).thenReturn((Object)bundle);
        testHarness.open();
        testHarness.close();
        ((StageBundleFactory)Mockito.verify((Object)this.stageBundleFactory)).getInstructionRequestHandler();
        ((StageBundleFactory)Mockito.verify((Object)this.stageBundleFactory)).close();
        ((ExecutableStageContext)Mockito.verify((Object)this.stageContext)).close();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.stageBundleFactory});
        operator.dispose();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{bundle});
    }

    @Test
    public void testEnsureStateCleanupWithKeyedInput() throws @UnknownKeyFor @NonNull @Initialized Exception {
        TupleTag mainOutput = new TupleTag("main-output");
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, (Coder)VarIntCoder.of(), new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        VarIntCoder keyCoder = VarIntCoder.of();
        ExecutableStageDoFnOperator operator = this.getOperator((TupleTag<Integer>)mainOutput, Collections.emptyList(), (DoFnOperator.MultiOutputOutputManagerFactory<Integer>)outputManagerFactory, WindowingStrategy.globalDefault(), (Coder)keyCoder, (Coder)WindowedValue.getFullCoder((Coder)keyCoder, (Coder)GlobalWindow.Coder.INSTANCE));
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector & Serializable)val -> val, (TypeInformation)new CoderTypeInformation((Coder)keyCoder, (PipelineOptions)FlinkPipelineOptions.defaults()));
        RemoteBundle bundle = (RemoteBundle)Mockito.mock(RemoteBundle.class);
        Mockito.when((Object)bundle.getInputReceivers()).thenReturn((Object)ImmutableMap.builder().put((Object)"input", (Object)((FnDataReceiver)Mockito.mock(FnDataReceiver.class))).build());
        Mockito.when((Object)this.stageBundleFactory.getBundle((OutputReceiverFactory)Matchers.any(), (TimerReceiverFactory)Matchers.any(), (StateRequestHandler)Matchers.any(), (BundleProgressHandler)Matchers.any(), (BundleFinalizationHandler)Matchers.any())).thenReturn((Object)bundle);
        testHarness.open();
        Object doFnRunner = Whitebox.getInternalState((Object)operator, (String)"doFnRunner");
        MatcherAssert.assertThat((Object)doFnRunner, (Matcher)org.hamcrest.Matchers.instanceOf(DoFnRunnerWithMetricsUpdate.class));
        Object statefulDoFnRunner = Whitebox.getInternalState((Object)doFnRunner, (String)"delegate");
        MatcherAssert.assertThat((Object)statefulDoFnRunner, (Matcher)org.hamcrest.Matchers.instanceOf(StatefulDoFnRunner.class));
    }

    @Test
    public void testEnsureStateCleanupWithKeyedInputCleanupTimer() {
        InMemoryTimerInternals inMemoryTimerInternals = new InMemoryTimerInternals();
        KeyedStateBackend keyedStateBackend = (KeyedStateBackend)Mockito.mock(KeyedStateBackend.class);
        Lock stateBackendLock = (Lock)Mockito.mock(Lock.class);
        StringUtf8Coder keyCoder = StringUtf8Coder.of();
        IntervalWindow window = new IntervalWindow(new Instant(0L), new Instant(10L));
        Coder windowCoder = IntervalWindow.getCoder();
        ExecutableStageDoFnOperator.CleanupTimer cleanupTimer = new ExecutableStageDoFnOperator.CleanupTimer((TimerInternals)inMemoryTimerInternals, stateBackendLock, WindowingStrategy.globalDefault(), (Coder)keyCoder, windowCoder, keyedStateBackend);
        cleanupTimer.setForWindow((Object)KV.of((Object)"key", (Object)"string"), (BoundedWindow)window);
        ((Lock)Mockito.verify((Object)stateBackendLock)).lock();
        ByteBuffer key = FlinkKeyUtils.encodeKey((Object)"key", (Coder)keyCoder);
        ((KeyedStateBackend)Mockito.verify((Object)keyedStateBackend)).setCurrentKey((Object)key);
        MatcherAssert.assertThat((Object)inMemoryTimerInternals.getNextTimer(TimeDomain.EVENT_TIME), (Matcher)org.hamcrest.Matchers.is((Object)window.maxTimestamp().plus(1L)));
        ((Lock)Mockito.verify((Object)stateBackendLock)).unlock();
    }

    @Test
    public void testEnsureStateCleanupWithKeyedInputStateCleaner() throws @UnknownKeyFor @NonNull @Initialized Exception {
        GlobalWindow.Coder windowCoder = GlobalWindow.Coder.INSTANCE;
        InMemoryStateInternals stateInternals = InMemoryStateInternals.forKey((Object)"key");
        ImmutableList userStateNames = ImmutableList.of((Object)"state1", (Object)"state2");
        ImmutableList.Builder bagStateBuilder = ImmutableList.builder();
        for (String userStateName : userStateNames) {
            BagState state = (BagState)stateInternals.state(StateNamespaces.window((Coder)windowCoder, (BoundedWindow)GlobalWindow.INSTANCE), StateTags.bag((String)userStateName, (Coder)StringUtf8Coder.of()));
            bagStateBuilder.add((Object)state);
            state.add((Object)"this should be cleaned");
        }
        ImmutableList bagStates = bagStateBuilder.build();
        MutableObject key = new MutableObject((Object)ByteBuffer.wrap(((String)stateInternals.getKey()).getBytes(StandardCharsets.UTF_8)));
        ExecutableStageDoFnOperator.StateCleaner stateCleaner = new ExecutableStageDoFnOperator.StateCleaner((List)userStateNames, (Coder)windowCoder, () -> ((MutableObject)key).getValue(), ts -> false, null);
        for (BagState bagState : bagStates) {
            MatcherAssert.assertThat((Object)Iterables.size((Iterable)bagState.read()), (Matcher)org.hamcrest.Matchers.is((Object)1));
        }
        stateCleaner.clearForWindow((BoundedWindow)GlobalWindow.INSTANCE);
        stateCleaner.cleanupState((StateInternals)stateInternals, arg_0 -> ((MutableObject)key).setValue(arg_0));
        for (BagState bagState : bagStates) {
            MatcherAssert.assertThat((Object)Iterables.size((Iterable)bagState.read()), (Matcher)org.hamcrest.Matchers.is((Object)0));
        }
    }

    @Test
    public void testEnsureDeferredStateCleanupTimerFiring() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.testEnsureDeferredStateCleanupTimerFiring(false);
    }

    @Test
    public void testEnsureDeferredStateCleanupTimerFiringWithCheckpointing() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.testEnsureDeferredStateCleanupTimerFiring(true);
    }

    private void testEnsureDeferredStateCleanupTimerFiring(@UnknownKeyFor @NonNull @Initialized boolean withCheckpointing) throws @UnknownKeyFor @NonNull @Initialized Exception {
        TupleTag mainOutput = new TupleTag("main-output");
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, (Coder)VoidCoder.of(), new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        StringUtf8Coder keyCoder = StringUtf8Coder.of();
        WindowingStrategy windowingStrategy = WindowingStrategy.of((WindowFn)FixedWindows.of((Duration)Duration.millis((long)1000L)));
        KvCoder kvCoder = KvCoder.of((Coder)keyCoder, (Coder)VarIntCoder.of());
        ExecutableStageDoFnOperator operator = this.getOperator((TupleTag<Integer>)mainOutput, Collections.emptyList(), (DoFnOperator.MultiOutputOutputManagerFactory<Integer>)outputManagerFactory, windowingStrategy, (Coder)keyCoder, (Coder)WindowedValue.getFullCoder((Coder)kvCoder, (Coder)windowingStrategy.getWindowFn().windowCoder()));
        RemoteBundle bundle = (RemoteBundle)Mockito.mock(RemoteBundle.class);
        Mockito.when((Object)this.stageBundleFactory.getBundle((OutputReceiverFactory)Matchers.any(), (TimerReceiverFactory)Matchers.any(), (StateRequestHandler)Matchers.any(), (BundleProgressHandler)Matchers.any(), (BundleFinalizationHandler)Matchers.any())).thenReturn((Object)bundle);
        KV timerInputKey = KV.of((Object)"transformId", (Object)"timerId");
        AtomicBoolean timerInputReceived = new AtomicBoolean();
        IntervalWindow window = new IntervalWindow(new Instant(0L), new Instant(1000L));
        IntervalWindow.IntervalWindowCoder windowCoder = IntervalWindow.IntervalWindowCoder.of();
        WindowedValue windowedValue = WindowedValue.of((Object)KV.of((Object)"one", (Object)1), (Instant)window.maxTimestamp(), (Collection)ImmutableList.of((Object)window), (PaneInfo)PaneInfo.NO_FIRING);
        FnDataReceiver receiver = (FnDataReceiver)Mockito.mock(FnDataReceiver.class);
        FnDataReceiver timerReceiver = (FnDataReceiver)Mockito.mock(FnDataReceiver.class);
        ((FnDataReceiver)Mockito.doAnswer(invocation -> {
            timerInputReceived.set(true);
            return null;
        }).when((Object)timerReceiver)).accept((Object)((Timer)Matchers.any()));
        Mockito.when((Object)bundle.getInputReceivers()).thenReturn((Object)ImmutableMap.of((Object)"input", (Object)receiver));
        Mockito.when((Object)bundle.getTimerReceivers()).thenReturn((Object)ImmutableMap.of((Object)timerInputKey, (Object)timerReceiver));
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, operator.keySelector, (TypeInformation)new CoderTypeInformation((Coder)FlinkKeyUtils.ByteBufferCoder.of(), (PipelineOptions)FlinkPipelineOptions.defaults()));
        testHarness.open();
        Lock stateBackendLock = (Lock)Whitebox.getInternalState((Object)operator, (String)"stateBackendLock");
        stateBackendLock.lock();
        KeyedStateBackend keyedStateBackend = operator.getKeyedStateBackend();
        ByteBuffer key = FlinkKeyUtils.encodeKey((Object)((String)((KV)windowedValue.getValue()).getKey()), (Coder)keyCoder);
        keyedStateBackend.setCurrentKey((Object)key);
        DoFnOperator.FlinkTimerInternals timerInternals = (DoFnOperator.FlinkTimerInternals)Whitebox.getInternalState((Object)operator, (String)"timerInternals");
        Object doFnRunner = Whitebox.getInternalState((Object)operator, (String)"doFnRunner");
        Object delegate = Whitebox.getInternalState((Object)doFnRunner, (String)"delegate");
        Object stateCleaner = Whitebox.getInternalState((Object)delegate, (String)"stateCleaner");
        Collection cleanupQueue = (Collection)Whitebox.getInternalState((Object)stateCleaner, (String)"cleanupQueue");
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)org.hamcrest.Matchers.is((Object)0));
        StateNamespace stateNamespace = StateNamespaces.window((Coder)windowCoder, (BoundedWindow)window);
        BagState state = (BagState)operator.keyedStateInternals.state(stateNamespace, StateTags.bag((String)"userState", (Coder)ByteStringCoder.of()));
        state.add((Object)ByteString.copyFrom((byte[])"userstate".getBytes(Charsets.UTF_8)));
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)org.hamcrest.Matchers.is((Object)1));
        TimerInternals.TimerData userTimer = TimerInternals.TimerData.of((String)TimerReceiverFactory.encodeToTimerDataTimerId((String)((String)timerInputKey.getKey()), (String)((String)timerInputKey.getValue())), (StateNamespace)stateNamespace, (Instant)window.maxTimestamp(), (Instant)window.maxTimestamp(), (TimeDomain)TimeDomain.EVENT_TIME);
        timerInternals.setTimer(userTimer);
        testHarness.processElement(new StreamRecord((Object)windowedValue));
        ((FnDataReceiver)Mockito.verify((Object)receiver)).accept((Object)windowedValue);
        testHarness.processWatermark(new Watermark(window.maxTimestamp().plus(1L).getMillis()));
        MatcherAssert.assertThat((Object)timerInputReceived.get(), (Matcher)org.hamcrest.Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)operator.getCurrentOutputWatermark(), (Matcher)org.hamcrest.Matchers.is((Object)BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()));
        operator.invokeFinishBundle();
        MatcherAssert.assertThat((Object)timerInputReceived.getAndSet(false), (Matcher)org.hamcrest.Matchers.is((Object)true));
        testHarness.processWatermark(new Watermark(window.maxTimestamp().plus(2L).getMillis()));
        operator.invokeFinishBundle();
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)org.hamcrest.Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)org.hamcrest.Matchers.is((Object)1));
        MatcherAssert.assertThat((Object)cleanupQueue, (Matcher)org.hamcrest.Matchers.hasSize((int)1));
        TimerInternals.TimerData userTimer2 = TimerInternals.TimerData.of((String)TimerReceiverFactory.encodeToTimerDataTimerId((String)((String)timerInputKey.getKey()), (String)((String)timerInputKey.getValue())), (StateNamespace)stateNamespace, (Instant)window.maxTimestamp(), (Instant)window.maxTimestamp(), (TimeDomain)TimeDomain.EVENT_TIME);
        operator.setTimer(Timer.of((Object)((String)((KV)windowedValue.getValue()).getKey()), (String)"", (Collection)windowedValue.getWindows(), (Instant)window.maxTimestamp(), (Instant)window.maxTimestamp(), (PaneInfo)PaneInfo.NO_FIRING), userTimer2);
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)org.hamcrest.Matchers.is((Object)1));
        if (withCheckpointing) {
            testHarness.snapshot(0L, 0L);
        } else {
            operator.invokeFinishBundle();
        }
        MatcherAssert.assertThat((Object)cleanupQueue, (Matcher)org.hamcrest.Matchers.hasSize((int)0));
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{receiver});
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)org.hamcrest.Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)org.hamcrest.Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)timerInputReceived.get(), (Matcher)org.hamcrest.Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)((Boolean)Whitebox.getInternalState((Object)operator, (String)"bundleStarted")), (Matcher)org.hamcrest.Matchers.is((Object)false));
        testHarness.setProcessingTime(testHarness.getProcessingTime() + 1L);
        MatcherAssert.assertThat((Object)timerInputReceived.getAndSet(false), (Matcher)org.hamcrest.Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)cleanupQueue, (Matcher)org.hamcrest.Matchers.hasSize((int)1));
        operator.invokeFinishBundle();
        MatcherAssert.assertThat((Object)cleanupQueue, (Matcher)org.hamcrest.Matchers.hasSize((int)0));
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)org.hamcrest.Matchers.is((Object)0));
        testHarness.close();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{receiver});
    }

    @Test
    public void testEnsureStateCleanupOnFinalWatermark() throws @UnknownKeyFor @NonNull @Initialized Exception {
        TupleTag mainOutput = new TupleTag("main-output");
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, (Coder)VoidCoder.of(), new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        StringUtf8Coder keyCoder = StringUtf8Coder.of();
        WindowingStrategy windowingStrategy = WindowingStrategy.globalDefault();
        Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
        KvCoder kvCoder = KvCoder.of((Coder)keyCoder, (Coder)VarIntCoder.of());
        ExecutableStageDoFnOperator operator = this.getOperator((TupleTag<Integer>)mainOutput, Collections.emptyList(), (DoFnOperator.MultiOutputOutputManagerFactory<Integer>)outputManagerFactory, windowingStrategy, (Coder)keyCoder, (Coder)WindowedValue.getFullCoder((Coder)kvCoder, (Coder)windowCoder));
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, operator.keySelector, (TypeInformation)new CoderTypeInformation((Coder)FlinkKeyUtils.ByteBufferCoder.of(), (PipelineOptions)FlinkPipelineOptions.defaults()));
        RemoteBundle bundle = (RemoteBundle)Mockito.mock(RemoteBundle.class);
        Mockito.when((Object)bundle.getInputReceivers()).thenReturn((Object)ImmutableMap.builder().put((Object)"input", (Object)((FnDataReceiver)Mockito.mock(FnDataReceiver.class))).build());
        Mockito.when((Object)this.stageBundleFactory.getBundle((OutputReceiverFactory)Matchers.any(), (TimerReceiverFactory)Matchers.any(), (StateRequestHandler)Matchers.any(), (BundleProgressHandler)Matchers.any(), (BundleFinalizationHandler)Matchers.any())).thenReturn((Object)bundle);
        testHarness.open();
        KeyedStateBackend keyedStateBackend = operator.getKeyedStateBackend();
        ByteBuffer key = FlinkKeyUtils.encodeKey((Object)"key1", (Coder)keyCoder);
        keyedStateBackend.setCurrentKey((Object)key);
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)org.hamcrest.Matchers.is((Object)0));
        StateNamespace stateNamespace = StateNamespaces.window((Coder)windowCoder, (BoundedWindow)GlobalWindow.INSTANCE);
        BagState state = (BagState)operator.keyedStateInternals.state(stateNamespace, StateTags.bag((String)"userState", (Coder)ByteStringCoder.of()));
        state.add((Object)ByteString.copyFrom((byte[])"userstate".getBytes(Charsets.UTF_8)));
        MatcherAssert.assertThat((Object)testHarness.numEventTimeTimers(), (Matcher)org.hamcrest.Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)org.hamcrest.Matchers.is((Object)1));
        testHarness.processWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.plus(1L).getMillis()));
        MatcherAssert.assertThat((Object)testHarness.numKeyedStateEntries(), (Matcher)org.hamcrest.Matchers.is((Object)0));
    }

    @Test
    public void testCacheTokenHandling() throws @UnknownKeyFor @NonNull @Initialized Exception {
        InMemoryStateInternals test = InMemoryStateInternals.forKey((Object)"test");
        KeyedStateBackend<ByteBuffer> stateBackend = FlinkStateInternalsTest.createStateBackend();
        ExecutableStageDoFnOperator.BagUserStateFactory bagUserStateFactory = new ExecutableStageDoFnOperator.BagUserStateFactory((StateInternals)test, stateBackend, (Lock)NoopLock.get(), null);
        ByteString key1 = ByteString.copyFrom((String)"key1", (Charset)Charsets.UTF_8);
        ByteString key2 = ByteString.copyFrom((String)"key2", (Charset)Charsets.UTF_8);
        Map userStateMapMock = (Map)Mockito.mock(Map.class);
        Map transformMap = (Map)Mockito.mock(Map.class);
        String userState1 = "userstate1";
        ProcessBundleDescriptors.BagUserStateSpec bagUserStateSpec1 = ExecutableStageDoFnOperatorTest.mockBagUserState("userstate1");
        Mockito.when((Object)((ProcessBundleDescriptors.BagUserStateSpec)transformMap.get("userstate1"))).thenReturn((Object)bagUserStateSpec1);
        String userState2 = "userstate2";
        ProcessBundleDescriptors.BagUserStateSpec bagUserStateSpec2 = ExecutableStageDoFnOperatorTest.mockBagUserState("userstate2");
        Mockito.when((Object)((ProcessBundleDescriptors.BagUserStateSpec)transformMap.get("userstate2"))).thenReturn((Object)bagUserStateSpec2);
        Mockito.when((Object)((Map)userStateMapMock.get(ArgumentMatchers.anyString()))).thenReturn((Object)transformMap);
        Mockito.when((Object)this.processBundleDescriptor.getBagUserStateSpecs()).thenReturn((Object)userStateMapMock);
        StateRequestHandler stateRequestHandler = StateRequestHandlers.forBagUserStateHandlerFactory((ProcessBundleDescriptors.ExecutableProcessBundleDescriptor)this.processBundleDescriptor, (StateRequestHandlers.BagUserStateHandlerFactory)bagUserStateFactory);
        BeamFnApi.ProcessBundleRequest.CacheToken expectedCacheToken = (BeamFnApi.ProcessBundleRequest.CacheToken)Iterables.getOnlyElement((Iterable)stateRequestHandler.getCacheTokens());
        stateRequestHandler.handle(ExecutableStageDoFnOperatorTest.getRequest(key1, "userstate1"));
        BeamFnApi.ProcessBundleRequest.CacheToken returnedCacheToken = (BeamFnApi.ProcessBundleRequest.CacheToken)Iterables.getOnlyElement((Iterable)stateRequestHandler.getCacheTokens());
        MatcherAssert.assertThat((Object)returnedCacheToken.hasUserState(), (Matcher)org.hamcrest.Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)returnedCacheToken, (Matcher)org.hamcrest.Matchers.is((Object)expectedCacheToken));
        List<RequestGenerator> generators = Arrays.asList(ExecutableStageDoFnOperatorTest::getRequest, ExecutableStageDoFnOperatorTest::getAppend, ExecutableStageDoFnOperatorTest::getClear);
        for (RequestGenerator req : generators) {
            stateRequestHandler.handle(req.makeRequest(key1, "userstate1"));
            MatcherAssert.assertThat((Object)((BeamFnApi.ProcessBundleRequest.CacheToken)Iterables.getOnlyElement((Iterable)stateRequestHandler.getCacheTokens())), (Matcher)org.hamcrest.Matchers.is((Object)expectedCacheToken));
            stateRequestHandler.handle(req.makeRequest(key2, "userstate1"));
            MatcherAssert.assertThat((Object)((BeamFnApi.ProcessBundleRequest.CacheToken)Iterables.getOnlyElement((Iterable)stateRequestHandler.getCacheTokens())), (Matcher)org.hamcrest.Matchers.is((Object)expectedCacheToken));
            stateRequestHandler.handle(req.makeRequest(key2, "userstate2"));
            MatcherAssert.assertThat((Object)((BeamFnApi.ProcessBundleRequest.CacheToken)Iterables.getOnlyElement((Iterable)stateRequestHandler.getCacheTokens())), (Matcher)org.hamcrest.Matchers.is((Object)expectedCacheToken));
        }
    }

    private static // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest getRequest(@UnknownKeyFor @NonNull @Initialized ByteString key, @UnknownKeyFor @NonNull @Initialized String userStateId) throws @UnknownKeyFor @NonNull @Initialized Exception {
        BeamFnApi.StateRequest.Builder builder = ExecutableStageDoFnOperatorTest.stateRequest(key, userStateId);
        builder.setGet(BeamFnApi.StateGetRequest.newBuilder().build());
        return builder.build();
    }

    private static // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest getAppend(@UnknownKeyFor @NonNull @Initialized ByteString key, @UnknownKeyFor @NonNull @Initialized String userStateId) throws @UnknownKeyFor @NonNull @Initialized Exception {
        BeamFnApi.StateRequest.Builder builder = ExecutableStageDoFnOperatorTest.stateRequest(key, userStateId);
        builder.setAppend(BeamFnApi.StateAppendRequest.newBuilder().build());
        return builder.build();
    }

    private static // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest getClear(@UnknownKeyFor @NonNull @Initialized ByteString key, @UnknownKeyFor @NonNull @Initialized String userStateId) throws @UnknownKeyFor @NonNull @Initialized Exception {
        BeamFnApi.StateRequest.Builder builder = ExecutableStageDoFnOperatorTest.stateRequest(key, userStateId);
        builder.setClear(BeamFnApi.StateClearRequest.newBuilder().build());
        return builder.build();
    }

    private static // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest.Builder stateRequest(@UnknownKeyFor @NonNull @Initialized ByteString key, @UnknownKeyFor @NonNull @Initialized String userStateId) throws @UnknownKeyFor @NonNull @Initialized Exception {
        return BeamFnApi.StateRequest.newBuilder().setStateKey(BeamFnApi.StateKey.newBuilder().setBagUserState(BeamFnApi.StateKey.BagUserState.newBuilder().setTransformId("transform").setKey(key).setUserStateId(userStateId).setWindow(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray((Coder)GlobalWindow.Coder.INSTANCE, (Object)GlobalWindow.INSTANCE))).build()));
    }

    private static // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized ProcessBundleDescriptors.BagUserStateSpec mockBagUserState(@UnknownKeyFor @NonNull @Initialized String userStateId) {
        ProcessBundleDescriptors.BagUserStateSpec bagUserStateMock = (ProcessBundleDescriptors.BagUserStateSpec)Mockito.mock(ProcessBundleDescriptors.BagUserStateSpec.class);
        Mockito.when((Object)bagUserStateMock.keyCoder()).thenReturn((Object)ByteStringCoder.of());
        Mockito.when((Object)bagUserStateMock.valueCoder()).thenReturn((Object)ByteStringCoder.of());
        Mockito.when((Object)bagUserStateMock.transformId()).thenReturn((Object)"transformId");
        Mockito.when((Object)bagUserStateMock.userStateId()).thenReturn((Object)userStateId);
        Mockito.when((Object)bagUserStateMock.windowCoder()).thenReturn((Object)GlobalWindow.Coder.INSTANCE);
        return bagUserStateMock;
    }

    @Test
    public void testSerialization() {
        WindowedValue.ValueOnlyWindowedValueCoder coder = WindowedValue.getValueOnlyCoder((Coder)VarIntCoder.of());
        TupleTag mainOutput = new TupleTag("main-output");
        TupleTag additionalOutput = new TupleTag("additional-output");
        ImmutableMap tagsToOutputTags = ImmutableMap.builder().put((Object)additionalOutput, (Object)new OutputTag(additionalOutput.getId(), TypeInformation.of(Integer.class))).build();
        ImmutableMap tagsToCoders = ImmutableMap.builder().put((Object)mainOutput, (Object)coder).put((Object)additionalOutput, (Object)coder).build();
        ImmutableMap tagsToIds = ImmutableMap.builder().put((Object)mainOutput, (Object)0).put((Object)additionalOutput, (Object)1).build();
        DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, (Map)tagsToOutputTags, (Map)tagsToCoders, (Map)tagsToIds, new SerializablePipelineOptions((PipelineOptions)FlinkPipelineOptions.defaults()));
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        ExecutableStageDoFnOperator operator = new ExecutableStageDoFnOperator("transform", (Coder)WindowedValue.getValueOnlyCoder((Coder)VarIntCoder.of()), Collections.emptyMap(), mainOutput, (List)ImmutableList.of((Object)additionalOutput), (DoFnOperator.OutputManagerFactory)outputManagerFactory, Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), (PipelineOptions)options, this.stagePayload, this.jobInfo, FlinkExecutableStageContextFactory.getInstance(), ExecutableStageDoFnOperatorTest.createOutputMap(mainOutput, ImmutableList.of((Object)additionalOutput)), WindowingStrategy.globalDefault(), null, null);
        ExecutableStageDoFnOperator clone = (ExecutableStageDoFnOperator)SerializationUtils.clone((Serializable)operator);
        Assert.assertNotNull((Object)clone);
        Assert.assertNotEquals((Object)operator, (Object)clone);
    }

    private @UnknownKeyFor @NonNull @Initialized ExecutableStageDoFnOperator getOperator(@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized Integer> mainOutput, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> additionalOutputs, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFnOperator.MultiOutputOutputManagerFactory<@UnknownKeyFor @NonNull @Initialized Integer> outputManagerFactory) {
        return this.getOperator(mainOutput, additionalOutputs, outputManagerFactory, WindowingStrategy.globalDefault(), null, (Coder)WindowedValue.getFullCoder((Coder)StringUtf8Coder.of(), (Coder)GlobalWindow.Coder.INSTANCE));
    }

    private @UnknownKeyFor @NonNull @Initialized ExecutableStageDoFnOperator getOperator(@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized Integer> mainOutput, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> additionalOutputs, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFnOperator.MultiOutputOutputManagerFactory<@UnknownKeyFor @NonNull @Initialized Integer> outputManagerFactory, @UnknownKeyFor @NonNull @Initialized WindowingStrategy windowingStrategy, @Nullable @UnknownKeyFor @Initialized Coder keyCoder, @UnknownKeyFor @NonNull @Initialized Coder windowedInputCoder) {
        FlinkExecutableStageContextFactory contextFactory = (FlinkExecutableStageContextFactory)Mockito.mock(FlinkExecutableStageContextFactory.class);
        Mockito.when((Object)contextFactory.get((JobInfo)Matchers.any())).thenReturn((Object)this.stageContext);
        RunnerApi.ExecutableStagePayload stagePayload = keyCoder != null ? this.stagePayloadWithUserState : this.stagePayload;
        ExecutableStageDoFnOperator operator = new ExecutableStageDoFnOperator("transform", windowedInputCoder, Collections.emptyMap(), mainOutput, additionalOutputs, outputManagerFactory, Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), (PipelineOptions)FlinkPipelineOptions.defaults(), stagePayload, this.jobInfo, contextFactory, ExecutableStageDoFnOperatorTest.createOutputMap(mainOutput, additionalOutputs), windowingStrategy, keyCoder, (KeySelector)(keyCoder != null ? new KvToByteBufferKeySelector(keyCoder, null) : null));
        Whitebox.setInternalState((Object)operator, (String)"stateRequestHandler", (Object)this.stateRequestHandler);
        return operator;
    }

    private static /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> createOutputMap(@UnknownKeyFor @NonNull @Initialized TupleTag mainOutput, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> additionalOutputs) {
        HashMap outputMap = new HashMap(additionalOutputs.size() + 1);
        if (mainOutput != null) {
            outputMap.put(mainOutput.getId(), mainOutput);
        }
        for (TupleTag<?> additionalTag : additionalOutputs) {
            outputMap.put(additionalTag.getId(), additionalTag);
        }
        return outputMap;
    }

    private static interface RequestGenerator {
        public // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest makeRequest(@UnknownKeyFor @NonNull @Initialized ByteString var1, @UnknownKeyFor @NonNull @Initialized String var2) throws @UnknownKeyFor @NonNull @Initialized Exception;
    }
}

