/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.stream.LongStream;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.streaming.StreamSources;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OutputTag;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Enclosed.class)
public class UnboundedSourceWrapperTest {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapperTest.class);

    private static @UnknownKeyFor @NonNull @Initialized Thread startProcessingTimeUpdateThread(final @UnknownKeyFor @NonNull @Initialized AbstractStreamOperatorTestHarness testHarness) {
        Thread processingTimeUpdateThread = new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    while (true) {
                        Object object = testHarness.getCheckpointLock();
                        synchronized (object) {
                            testHarness.setProcessingTime(System.currentTimeMillis());
                        }
                        Thread.sleep(10L);
                    }
                }
                catch (InterruptedException e) {
                }
                catch (Exception e) {
                    LOG.error("Unexpected error advancing processing time", (Throwable)e);
                }
            }
        };
        processingTimeUpdateThread.start();
        return processingTimeUpdateThread;
    }

    private static final class TestStreamStatusMaintainer
    implements StreamStatusMaintainer {
        @UnknownKeyFor @NonNull @Initialized StreamStatus currentStreamStatus = StreamStatus.ACTIVE;

        private TestStreamStatusMaintainer() {
        }

        public void toggleStreamStatus(@UnknownKeyFor @NonNull @Initialized StreamStatus streamStatus) {
            if (!this.currentStreamStatus.equals((Object)streamStatus)) {
                this.currentStreamStatus = streamStatus;
            }
        }

        public @UnknownKeyFor @NonNull @Initialized StreamStatus getStreamStatus() {
            return this.currentStreamStatus;
        }
    }

    private static class IdlingUnboundedSource<@UnknownKeyFor T extends @UnknownKeyFor @NonNull @Initialized Serializable>
    extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
        private final @UnknownKeyFor @NonNull @Initialized ConcurrentHashMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer> numIdles = new ConcurrentHashMap();
        private final @UnknownKeyFor @NonNull @Initialized String uuid = UUID.randomUUID().toString();
        private final @UnknownKeyFor @NonNull @Initialized List<T> data;
        private final @UnknownKeyFor @NonNull @Initialized Coder<T> outputCoder;

        public IdlingUnboundedSource(@UnknownKeyFor @NonNull @Initialized List<T> data, @UnknownKeyFor @NonNull @Initialized Coder<T> outputCoder) {
            this.data = data;
            this.outputCoder = outputCoder;
        }

        public @UnknownKeyFor @NonNull @Initialized List<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized UnboundedSource<T, // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized UnboundedSource.CheckpointMark>> split(@UnknownKeyFor @NonNull @Initialized int desiredNumSplits, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
            return Collections.singletonList(this);
        }

        public // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized UnboundedSource.UnboundedReader<T> createReader(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, // Could not load outer class - annotation placement on inner may be incorrect
        @Nullable @UnknownKeyFor @Initialized UnboundedSource.CheckpointMark checkpointMark) {
            return new UnboundedSource.UnboundedReader<T>(){
                private @UnknownKeyFor @NonNull @Initialized int currentIdx = -1;
                private @UnknownKeyFor @NonNull @Initialized boolean lastAdvanced = false;

                public @UnknownKeyFor @NonNull @Initialized boolean start() {
                    return this.advance();
                }

                public @UnknownKeyFor @NonNull @Initialized boolean advance() {
                    if (this.lastAdvanced) {
                        numIdles.merge(uuid, 1, Integer::sum);
                        this.lastAdvanced = false;
                        return false;
                    }
                    if (this.currentIdx < data.size() - 1) {
                        ++this.currentIdx;
                        this.lastAdvanced = true;
                        return true;
                    }
                    return false;
                }

                public @UnknownKeyFor @NonNull @Initialized Instant getWatermark() {
                    if (this.currentIdx >= data.size() - 1) {
                        return BoundedWindow.TIMESTAMP_MAX_VALUE;
                    }
                    return new Instant((long)this.currentIdx);
                }

                public // Could not load outer class - annotation placement on inner may be incorrect
                @UnknownKeyFor @NonNull @Initialized UnboundedSource.CheckpointMark getCheckpointMark() {
                    return UnboundedSource.CheckpointMark.NOOP_CHECKPOINT_MARK;
                }

                public /*
                 * Issues handling annotations - annotations may be inaccurate
                 */
                @UnknownKeyFor @NonNull @Initialized UnboundedSource<T, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> getCurrentSource() {
                    return this;
                }

                public T getCurrent() throws @UnknownKeyFor @NonNull @Initialized NoSuchElementException {
                    if (this.currentIdx >= 0 && this.currentIdx < data.size()) {
                        return (Serializable)data.get(this.currentIdx);
                    }
                    throw new NoSuchElementException();
                }

                public @UnknownKeyFor @NonNull @Initialized Instant getCurrentTimestamp() throws @UnknownKeyFor @NonNull @Initialized NoSuchElementException {
                    if (this.currentIdx >= 0 && this.currentIdx < data.size()) {
                        return new Instant((long)this.currentIdx);
                    }
                    throw new NoSuchElementException();
                }

                public void close() {
                }
            };
        }

        public @UnknownKeyFor @NonNull @Initialized Coder<// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized UnboundedSource.CheckpointMark> getCheckpointMarkCoder() {
            return null;
        }

        public @UnknownKeyFor @NonNull @Initialized Coder<T> getOutputCoder() {
            return this.outputCoder;
        }

        @UnknownKeyFor @NonNull @Initialized int getNumIdles() {
            return this.numIdles.getOrDefault(this.uuid, 0);
        }
    }

    @RunWith(value=JUnit4.class)
    public static class IntegrationTests {
        @Test(timeout=30000L)
        public void testPollingOfIdleReaders() throws @UnknownKeyFor @NonNull @Initialized Exception {
            IdlingUnboundedSource<String> source = new IdlingUnboundedSource<String>(Arrays.asList("first", "second", "third"), (Coder<String>)StringUtf8Coder.of());
            FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
            options.setShutdownSourcesAfterIdleMs(Long.valueOf(0L));
            options.setParallelism(Integer.valueOf(4));
            UnboundedSourceWrapper wrappedSource = new UnboundedSourceWrapper("sequentialRead", (PipelineOptions)options, source, 4);
            StreamSource sourceOperator = new StreamSource((SourceFunction)wrappedSource);
            AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)sourceOperator, 4, 4, 0);
            testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
            testHarness.getExecutionConfig().setAutoWatermarkInterval(10L);
            testHarness.open();
            final ArrayList output = new ArrayList();
            Thread processingTimeUpdateThread = UnboundedSourceWrapperTest.startProcessingTimeUpdateThread(testHarness);
            StreamSources.run(sourceOperator, testHarness.getCheckpointLock(), new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<String>>>>(){

                public void emitWatermark(@UnknownKeyFor @NonNull @Initialized Watermark mark) {
                }

                public void emitLatencyMarker(@UnknownKeyFor @NonNull @Initialized LatencyMarker latencyMarker) {
                }

                public <X> void collect(@UnknownKeyFor @NonNull @Initialized OutputTag<X> outputTag, @UnknownKeyFor @NonNull @Initialized StreamRecord<X> record) {
                    throw new IllegalStateException();
                }

                public void collect(@UnknownKeyFor @NonNull @Initialized StreamRecord<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<@UnknownKeyFor @NonNull @Initialized String>>> record) {
                    output.add((String)((ValueWithRecordId)((WindowedValue)record.getValue()).getValue()).getValue());
                }

                public void close() {
                }
            });
            MatcherAssert.assertThat((Object)source.getNumIdles(), (Matcher)Is.is((Object)3));
            MatcherAssert.assertThat(output, (Matcher)Matchers.contains((Object[])new String[]{"first", "second", "third"}));
            processingTimeUpdateThread.interrupt();
            processingTimeUpdateThread.join();
        }
    }

    @RunWith(value=JUnit4.class)
    public static class BasicTest {
        @Test
        public void testSerialization() throws @UnknownKeyFor @NonNull @Initialized Exception {
            boolean parallelism = true;
            int numElements = 20;
            PipelineOptions options = PipelineOptionsFactory.create();
            TestCountingSource source = new TestCountingSource(20);
            UnboundedSourceWrapper flinkWrapper = new UnboundedSourceWrapper("stepName", options, (UnboundedSource)source, 1);
            InstantiationUtil.serializeObject((Object)flinkWrapper);
        }

        @Test(timeout=10000L)
        public void testSourceWithNoReaderDoesNotShutdown() throws @UnknownKeyFor @NonNull @Initialized Exception {
            BasicTest.testSourceDoesNotShutdown(false);
        }

        @Test(timeout=10000L)
        public void testSourceWithReadersDoesNotShutdown() throws @UnknownKeyFor @NonNull @Initialized Exception {
            BasicTest.testSourceDoesNotShutdown(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private static void testSourceDoesNotShutdown(@UnknownKeyFor @NonNull @Initialized boolean shouldHaveReaders) throws @UnknownKeyFor @NonNull @Initialized Exception {
            int parallelism = 2;
            FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
            options.setShutdownSourcesAfterIdleMs(Long.valueOf(Long.MAX_VALUE));
            TestCountingSource source = new TestCountingSource(20).withoutSplitting();
            UnboundedSourceWrapper sourceWrapper = new UnboundedSourceWrapper("noReader", (PipelineOptions)options, (UnboundedSource)source, 2);
            StreamingRuntimeContext mock = (StreamingRuntimeContext)Mockito.mock(StreamingRuntimeContext.class);
            if (shouldHaveReaders) {
                Mockito.when((Object)mock.getIndexOfThisSubtask()).thenReturn((Object)0);
            } else {
                Mockito.when((Object)mock.getIndexOfThisSubtask()).thenReturn((Object)1);
            }
            Mockito.when((Object)mock.getNumberOfParallelSubtasks()).thenReturn((Object)2);
            Mockito.when((Object)mock.getExecutionConfig()).thenReturn((Object)new ExecutionConfig());
            ProcessingTimeService timerService = (ProcessingTimeService)Mockito.mock(ProcessingTimeService.class);
            Mockito.when((Object)timerService.getCurrentProcessingTime()).thenReturn((Object)Long.MAX_VALUE);
            Mockito.when((Object)mock.getProcessingTimeService()).thenReturn((Object)timerService);
            sourceWrapper.setRuntimeContext((RuntimeContext)mock);
            sourceWrapper.open(new Configuration());
            SourceFunction.SourceContext sourceContext = (SourceFunction.SourceContext)Mockito.mock(SourceFunction.SourceContext.class);
            Object checkpointLock = new Object();
            Mockito.when((Object)sourceContext.getCheckpointLock()).thenReturn(checkpointLock);
            sourceWrapper.setSourceContext(sourceContext);
            sourceWrapper.open(new Configuration());
            MatcherAssert.assertThat((Object)sourceWrapper.getLocalReaders().isEmpty(), (Matcher)Is.is((Object)(!shouldHaveReaders ? 1 : 0)));
            Thread thread = new Thread(() -> {
                try {
                    sourceWrapper.run(sourceContext);
                }
                catch (Exception e) {
                    LOG.error("Error while running UnboundedSourceWrapper", (Throwable)e);
                }
            });
            try {
                thread.start();
                if (!shouldHaveReaders) {
                    StackTraceElement[] callStack;
                    while ((callStack = thread.getStackTrace()).length < 2 || !"sleep".equals(callStack[0].getMethodName()) || !"finalizeSource".equals(callStack[1].getMethodName())) {
                        Thread.sleep(10L);
                    }
                }
                MatcherAssert.assertThat((Object)sourceWrapper.isRunning(), (Matcher)Is.is((Object)true));
                Object object = checkpointLock;
                synchronized (object) {
                    sourceWrapper.onProcessingTime(42L);
                }
                MatcherAssert.assertThat((Object)sourceWrapper.isRunning(), (Matcher)Is.is((Object)true));
                MatcherAssert.assertThat((Object)thread.isAlive(), (Matcher)Is.is((Object)true));
                sourceWrapper.cancel();
            }
            finally {
                thread.interrupt();
                thread.join(1000L);
            }
            MatcherAssert.assertThat((Object)thread.isAlive(), (Matcher)Is.is((Object)false));
        }

        @Test
        public void testSequentialReadingFromBoundedSource() throws @UnknownKeyFor @NonNull @Initialized Exception {
            UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter source = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(CountingSource.upTo((long)1000L));
            FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
            UnboundedSourceWrapper sourceWrapper = new UnboundedSourceWrapper("sequentialRead", (PipelineOptions)options, (UnboundedSource)source, 4);
            StreamingRuntimeContext runtimeContextMock = (StreamingRuntimeContext)Mockito.mock(StreamingRuntimeContext.class);
            Mockito.when((Object)runtimeContextMock.getIndexOfThisSubtask()).thenReturn((Object)0);
            Mockito.when((Object)runtimeContextMock.getNumberOfParallelSubtasks()).thenReturn((Object)2);
            Mockito.when((Object)runtimeContextMock.getExecutionConfig()).thenReturn((Object)new ExecutionConfig());
            TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
            processingTimeService.setCurrentTime(0L);
            Mockito.when((Object)runtimeContextMock.getProcessingTimeService()).thenReturn((Object)processingTimeService);
            Mockito.when((Object)runtimeContextMock.getMetricGroup()).thenReturn((Object)new UnregisteredMetricsGroup());
            sourceWrapper.setRuntimeContext((RuntimeContext)runtimeContextMock);
            sourceWrapper.open(new Configuration());
            MatcherAssert.assertThat((Object)sourceWrapper.getLocalReaders().size(), (Matcher)Is.is((Object)2));
            final ArrayList integers = new ArrayList();
            sourceWrapper.run((SourceFunction.SourceContext)new SourceFunction.SourceContext<WindowedValue<ValueWithRecordId<Long>>>(){
                private final @UnknownKeyFor @NonNull @Initialized Object checkpointLock = new Object();

                public void collect(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<@UnknownKeyFor @NonNull @Initialized Long>> element) {
                    integers.add((Long)((ValueWithRecordId)element.getValue()).getValue());
                }

                public void collectWithTimestamp(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<@UnknownKeyFor @NonNull @Initialized Long>> element, @UnknownKeyFor @NonNull @Initialized long timestamp) {
                    throw new IllegalStateException("Should not collect with timestamp");
                }

                public void emitWatermark(@UnknownKeyFor @NonNull @Initialized Watermark mark) {
                }

                public void markAsTemporarilyIdle() {
                }

                public @UnknownKeyFor @NonNull @Initialized Object getCheckpointLock() {
                    return this.checkpointLock;
                }

                public void close() {
                }
            });
            MatcherAssert.assertThat((Object)integers.size(), (Matcher)Is.is((Object)500));
            MatcherAssert.assertThat(integers, (Matcher)Matchers.contains((Object[])LongStream.concat(LongStream.range(0L, 250L), LongStream.range(500L, 750L)).boxed().toArray()));
        }

        @Test
        public void testAccumulatorRegistrationOnOperatorClose() throws @UnknownKeyFor @NonNull @Initialized Exception {
            FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
            TestCountingSource source = new TestCountingSource(20).withoutSplitting();
            UnboundedSourceWrapper sourceWrapper = new UnboundedSourceWrapper("noReader", (PipelineOptions)options, (UnboundedSource)source, 2);
            StreamingRuntimeContext mock = (StreamingRuntimeContext)Mockito.mock(StreamingRuntimeContext.class);
            Mockito.when((Object)mock.getNumberOfParallelSubtasks()).thenReturn((Object)1);
            Mockito.when((Object)mock.getExecutionConfig()).thenReturn((Object)new ExecutionConfig());
            Mockito.when((Object)mock.getIndexOfThisSubtask()).thenReturn((Object)0);
            sourceWrapper.setRuntimeContext((RuntimeContext)mock);
            sourceWrapper.open(new Configuration());
            String metricContainerFieldName = "metricContainer";
            FlinkMetricContainer monitoredContainer = (FlinkMetricContainer)Mockito.spy((Object)((FlinkMetricContainer)Whitebox.getInternalState((Object)sourceWrapper, (String)metricContainerFieldName)));
            Whitebox.setInternalState((Object)sourceWrapper, (String)metricContainerFieldName, (Object)monitoredContainer);
            sourceWrapper.close();
            ((FlinkMetricContainer)Mockito.verify((Object)monitoredContainer)).registerMetricsForPipelineResult();
        }
    }

    @RunWith(value=Parameterized.class)
    public static class ParameterizedUnboundedSourceWrapperTest {
        private final @UnknownKeyFor @NonNull @Initialized int numTasks;
        private final @UnknownKeyFor @NonNull @Initialized int numSplits;

        public ParameterizedUnboundedSourceWrapperTest(@UnknownKeyFor @NonNull @Initialized int numTasks, @UnknownKeyFor @NonNull @Initialized int numSplits) {
            this.numTasks = numTasks;
            this.numSplits = numSplits;
        }

        @Parameterized.Parameters(name="numTasks = {0}; numSplits={1}")
        public static @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized Object @UnknownKeyFor @NonNull @Initialized []> data() {
            return Arrays.asList({1, 1}, {1, 2}, {1, 4}, {2, 1}, {2, 2}, {2, 4}, {4, 1}, {4, 2}, {4, 4});
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Test(timeout=30000L)
        public void testValueEmission() throws @UnknownKeyFor @NonNull @Initialized Exception {
            int numElementsPerShard = 20;
            FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
            final long[] numElementsReceived = new long[]{0L};
            final int[] numWatermarksReceived = new int[]{0};
            TestCountingSource source = new TestCountingSource(20).withFixedNumSplits(this.numSplits);
            for (int subtaskIndex = 0; subtaskIndex < this.numTasks; ++subtaskIndex) {
                UnboundedSourceWrapper flinkWrapper = new UnboundedSourceWrapper("stepName", (PipelineOptions)options, (UnboundedSource)source, this.numTasks);
                Assert.assertEquals((long)this.numSplits, (long)flinkWrapper.getSplitSources().size());
                StreamSource sourceOperator = new StreamSource((SourceFunction)flinkWrapper);
                AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)sourceOperator, this.numTasks, this.numTasks, subtaskIndex);
                testHarness.getExecutionConfig().setAutoWatermarkInterval(10L);
                testHarness.setProcessingTime(System.currentTimeMillis());
                testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
                Thread processingTimeUpdateThread = UnboundedSourceWrapperTest.startProcessingTimeUpdateThread(testHarness);
                try {
                    testHarness.open();
                    StreamSources.run(sourceOperator, testHarness.getCheckpointLock(), new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>(){
                        private @UnknownKeyFor @NonNull @Initialized boolean hasSeenMaxWatermark = false;

                        public void emitWatermark(@UnknownKeyFor @NonNull @Initialized Watermark watermark) {
                            if (!this.hasSeenMaxWatermark && watermark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
                                numWatermarksReceived[0] = numWatermarksReceived[0] + 1;
                                this.hasSeenMaxWatermark = true;
                            }
                        }

                        public <X> void collect(@UnknownKeyFor @NonNull @Initialized OutputTag<X> outputTag, @UnknownKeyFor @NonNull @Initialized StreamRecord<X> streamRecord) {
                            this.collect(streamRecord);
                        }

                        public void emitLatencyMarker(@UnknownKeyFor @NonNull @Initialized LatencyMarker latencyMarker) {
                        }

                        public void collect(@UnknownKeyFor @NonNull @Initialized StreamRecord<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized Integer>>>> windowedValueStreamRecord) {
                            numElementsReceived[0] = numElementsReceived[0] + 1L;
                        }

                        public void close() {
                        }
                    });
                    continue;
                }
                finally {
                    processingTimeUpdateThread.interrupt();
                    processingTimeUpdateThread.join();
                }
            }
            Assert.assertEquals((long)(20 * this.numSplits), (long)numElementsReceived[0]);
            Assert.assertEquals((long)this.numTasks, (long)numWatermarksReceived[0]);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Test(timeout=30000L)
        public void testWatermarkEmission() throws @UnknownKeyFor @NonNull @Initialized Exception {
            int numElements = 500;
            PipelineOptions options = PipelineOptionsFactory.create();
            TestCountingSource source = new TestCountingSource(500);
            UnboundedSourceWrapper flinkWrapper = new UnboundedSourceWrapper("stepName", options, (UnboundedSource)source, this.numSplits);
            Assert.assertEquals((long)this.numSplits, (long)flinkWrapper.getSplitSources().size());
            StreamSource sourceOperator = new StreamSource((SourceFunction)flinkWrapper);
            AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)sourceOperator, this.numTasks, this.numTasks, 0);
            testHarness.getExecutionConfig().setLatencyTrackingInterval(0L);
            testHarness.getExecutionConfig().setAutoWatermarkInterval(1L);
            testHarness.setProcessingTime(Long.MIN_VALUE);
            testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
            ConcurrentLinkedQueue caughtExceptions = new ConcurrentLinkedQueue();
            final CountDownLatch seenWatermarks = new CountDownLatch(2);
            int minElementsPerReader = 500 / this.numSplits;
            final CountDownLatch minElementsCountdown = new CountDownLatch(minElementsPerReader);
            source.haltEmission();
            testHarness.open();
            Thread sourceThread = new Thread(() -> {
                try {
                    StreamSources.run(sourceOperator, testHarness.getCheckpointLock(), new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>(){

                        public void emitWatermark(@UnknownKeyFor @NonNull @Initialized Watermark watermark) {
                            seenWatermarks.countDown();
                        }

                        public <X> void collect(@UnknownKeyFor @NonNull @Initialized OutputTag<X> outputTag, @UnknownKeyFor @NonNull @Initialized StreamRecord<X> streamRecord) {
                        }

                        public void emitLatencyMarker(@UnknownKeyFor @NonNull @Initialized LatencyMarker latencyMarker) {
                        }

                        public void collect(@UnknownKeyFor @NonNull @Initialized StreamRecord<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized Integer>>>> windowedValueStreamRecord) {
                            minElementsCountdown.countDown();
                        }

                        public void close() {
                        }
                    });
                }
                catch (Exception e) {
                    LOG.info("Caught exception:", (Throwable)e);
                    caughtExceptions.add(e);
                }
            });
            sourceThread.start();
            while (flinkWrapper.getLocalReaders().stream().anyMatch(reader -> reader.getWatermark().getMillis() == 0L)) {
                Thread.sleep(50L);
            }
            Object object = testHarness.getCheckpointLock();
            synchronized (object) {
                testHarness.setProcessingTime(0L);
            }
            source.continueEmission();
            minElementsCountdown.await();
            object = testHarness.getCheckpointLock();
            synchronized (object) {
                testHarness.setProcessingTime(Long.MAX_VALUE);
            }
            seenWatermarks.await();
            if (!caughtExceptions.isEmpty()) {
                Assert.fail((String)("Caught exception(s): " + Joiner.on((String)",").join(caughtExceptions)));
            }
            sourceOperator.cancel();
            sourceThread.join();
        }

        @Test
        public void testRestore() throws @UnknownKeyFor @NonNull @Initialized Exception {
            int numElements = 20;
            Object checkpointLock = new Object();
            PipelineOptions options = PipelineOptionsFactory.create();
            TestCountingSource source = new TestCountingSource(20);
            UnboundedSourceWrapper flinkWrapper = new UnboundedSourceWrapper("stepName", options, (UnboundedSource)source, this.numSplits);
            Assert.assertEquals((long)this.numSplits, (long)flinkWrapper.getSplitSources().size());
            StreamSource sourceOperator = new StreamSource((SourceFunction)flinkWrapper);
            AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)sourceOperator, this.numTasks, this.numTasks, 0);
            testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
            final HashSet emittedElements = new HashSet();
            boolean readFirstBatchOfElements = false;
            try {
                testHarness.open();
                StreamSources.run(sourceOperator, checkpointLock, new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>(){
                    private @UnknownKeyFor @NonNull @Initialized int count = 0;

                    public void emitWatermark(@UnknownKeyFor @NonNull @Initialized Watermark watermark) {
                    }

                    public <X> void collect(@UnknownKeyFor @NonNull @Initialized OutputTag<X> outputTag, @UnknownKeyFor @NonNull @Initialized StreamRecord<X> streamRecord) {
                        this.collect(streamRecord);
                    }

                    public void emitLatencyMarker(@UnknownKeyFor @NonNull @Initialized LatencyMarker latencyMarker) {
                    }

                    public void collect(@UnknownKeyFor @NonNull @Initialized StreamRecord<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized Integer>>>> windowedValueStreamRecord) {
                        emittedElements.add((KV)((ValueWithRecordId)((WindowedValue)windowedValueStreamRecord.getValue()).getValue()).getValue());
                        ++this.count;
                        if (this.count >= 10) {
                            throw new SuccessException();
                        }
                    }

                    public void close() {
                    }
                });
            }
            catch (SuccessException e) {
                readFirstBatchOfElements = true;
            }
            Assert.assertTrue((String)"Did not successfully read first batch of elements.", (boolean)readFirstBatchOfElements);
            OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
            ArrayList<Integer> finalizeList = new ArrayList<Integer>();
            TestCountingSource.setFinalizeTracker(finalizeList);
            testHarness.notifyOfCompletedCheckpoint(0L);
            Assert.assertEquals((long)flinkWrapper.getLocalSplitSources().size(), (long)finalizeList.size());
            TestCountingSource restoredSource = new TestCountingSource(20);
            UnboundedSourceWrapper restoredFlinkWrapper = new UnboundedSourceWrapper("stepName", options, (UnboundedSource)restoredSource, this.numSplits);
            Assert.assertEquals((long)this.numSplits, (long)restoredFlinkWrapper.getSplitSources().size());
            StreamSource restoredSourceOperator = new StreamSource((SourceFunction)restoredFlinkWrapper);
            AbstractStreamOperatorTestHarness restoredTestHarness = new AbstractStreamOperatorTestHarness((StreamOperator)restoredSourceOperator, this.numTasks, 1, 0);
            restoredTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
            restoredTestHarness.initializeState(snapshot);
            boolean readSecondBatchOfElements = false;
            try {
                restoredTestHarness.open();
                StreamSources.run(restoredSourceOperator, checkpointLock, new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>(){
                    private @UnknownKeyFor @NonNull @Initialized int count = 0;

                    public void emitWatermark(@UnknownKeyFor @NonNull @Initialized Watermark watermark) {
                    }

                    public <X> void collect(@UnknownKeyFor @NonNull @Initialized OutputTag<X> outputTag, @UnknownKeyFor @NonNull @Initialized StreamRecord<X> streamRecord) {
                        this.collect(streamRecord);
                    }

                    public void emitLatencyMarker(@UnknownKeyFor @NonNull @Initialized LatencyMarker latencyMarker) {
                    }

                    public void collect(@UnknownKeyFor @NonNull @Initialized StreamRecord<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized Integer>>>> windowedValueStreamRecord) {
                        emittedElements.add((KV)((ValueWithRecordId)((WindowedValue)windowedValueStreamRecord.getValue()).getValue()).getValue());
                        ++this.count;
                        if (this.count >= 10) {
                            throw new SuccessException();
                        }
                    }

                    public void close() {
                    }
                });
            }
            catch (SuccessException e) {
                readSecondBatchOfElements = true;
            }
            Assert.assertEquals((long)Math.max(1, this.numSplits / this.numTasks), (long)restoredFlinkWrapper.getLocalSplitSources().size());
            Assert.assertTrue((String)"Did not successfully read second batch of elements.", (boolean)readSecondBatchOfElements);
            Assert.assertTrue((emittedElements.size() == 20 ? 1 : 0) != 0);
        }

        @Test
        public void testNullCheckpoint() throws @UnknownKeyFor @NonNull @Initialized Exception {
            int numElements = 20;
            PipelineOptions options = PipelineOptionsFactory.create();
            TestCountingSource source = new TestCountingSource(20){

                @Override
                public @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized TestCountingSource.CounterMark> getCheckpointMarkCoder() {
                    return null;
                }
            };
            UnboundedSourceWrapper flinkWrapper = new UnboundedSourceWrapper("stepName", options, (UnboundedSource)source, this.numSplits);
            StreamSource sourceOperator = new StreamSource((SourceFunction)flinkWrapper);
            AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)sourceOperator, this.numTasks, this.numTasks, 0);
            testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
            testHarness.open();
            OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
            UnboundedSourceWrapper restoredFlinkWrapper = new UnboundedSourceWrapper("stepName", options, (UnboundedSource)new TestCountingSource(20), this.numSplits);
            StreamSource restoredSourceOperator = new StreamSource((SourceFunction)restoredFlinkWrapper);
            AbstractStreamOperatorTestHarness restoredTestHarness = new AbstractStreamOperatorTestHarness((StreamOperator)restoredSourceOperator, this.numTasks, 1, 0);
            restoredTestHarness.setup();
            restoredTestHarness.initializeState(snapshot);
            restoredTestHarness.open();
            Assert.assertEquals((long)0L, (long)restoredFlinkWrapper.getLocalSplitSources().size());
        }

        private static class SuccessException
        extends RuntimeException {
            private SuccessException() {
            }
        }
    }
}

