package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.stream.LongStream;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.streaming.StreamSources;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OutputTag;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Enclosed.class)
/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.class */
public class UnboundedSourceWrapperTest {
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapperTest.class);

    @RunWith(JUnit4.class)
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest$BasicTest.class */
    public static class BasicTest {
        @Test
        public void testSerialization() throws Exception {
            InstantiationUtil.serializeObject(new UnboundedSourceWrapper("stepName", PipelineOptionsFactory.create(), new TestCountingSource(20), 1));
        }

        @Test(timeout = 10000)
        public void testSourceWithNoReaderDoesNotShutdown() throws Exception {
            testSourceDoesNotShutdown(false);
        }

        @Test(timeout = 10000)
        public void testSourceWithReadersDoesNotShutdown() throws Exception {
            testSourceDoesNotShutdown(true);
        }

        private static void testSourceDoesNotShutdown(boolean z) throws Exception {
            FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
            defaults.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE);
            UnboundedSourceWrapper unboundedSourceWrapper = new UnboundedSourceWrapper("noReader", defaults, new TestCountingSource(20).withoutSplitting(), 2);
            StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
            if (z) {
                Mockito.when(Integer.valueOf(streamingRuntimeContext.getIndexOfThisSubtask())).thenReturn(0);
            } else {
                Mockito.when(Integer.valueOf(streamingRuntimeContext.getIndexOfThisSubtask())).thenReturn(1);
            }
            Mockito.when(Integer.valueOf(streamingRuntimeContext.getNumberOfParallelSubtasks())).thenReturn(2);
            Mockito.when(streamingRuntimeContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
            ProcessingTimeService processingTimeService = (ProcessingTimeService) Mockito.mock(ProcessingTimeService.class);
            Mockito.when(Long.valueOf(processingTimeService.getCurrentProcessingTime())).thenReturn(Long.MAX_VALUE);
            Mockito.when(streamingRuntimeContext.getProcessingTimeService()).thenReturn(processingTimeService);
            unboundedSourceWrapper.setRuntimeContext(streamingRuntimeContext);
            unboundedSourceWrapper.open(new Configuration());
            SourceFunction.SourceContext sourceContext = (SourceFunction.SourceContext) Mockito.mock(SourceFunction.SourceContext.class);
            Object obj = new Object();
            Mockito.when(sourceContext.getCheckpointLock()).thenReturn(obj);
            unboundedSourceWrapper.setSourceContext(sourceContext);
            unboundedSourceWrapper.open(new Configuration());
            MatcherAssert.assertThat(Boolean.valueOf(unboundedSourceWrapper.getLocalReaders().isEmpty()), Is.is(Boolean.valueOf(!z)));
            Thread thread = new Thread(() -> {
                try {
                    unboundedSourceWrapper.run(sourceContext);
                } catch (Exception e) {
                    UnboundedSourceWrapperTest.LOG.error("Error while running UnboundedSourceWrapper", e);
                }
            });
            try {
                thread.start();
                if (!z) {
                    while (true) {
                        StackTraceElement[] stackTrace = thread.getStackTrace();
                        if (stackTrace.length >= 2 && "sleep".equals(stackTrace[0].getMethodName()) && "finalizeSource".equals(stackTrace[1].getMethodName())) {
                            break;
                        } else {
                            Thread.sleep(10L);
                        }
                    }
                }
                MatcherAssert.assertThat(Boolean.valueOf(unboundedSourceWrapper.isRunning()), Is.is(true));
                synchronized (obj) {
                    unboundedSourceWrapper.onProcessingTime(42L);
                }
                MatcherAssert.assertThat(Boolean.valueOf(unboundedSourceWrapper.isRunning()), Is.is(true));
                MatcherAssert.assertThat(Boolean.valueOf(thread.isAlive()), Is.is(true));
                unboundedSourceWrapper.cancel();
                thread.interrupt();
                thread.join(1000L);
                MatcherAssert.assertThat(Boolean.valueOf(thread.isAlive()), Is.is(false));
            } catch (Throwable th) {
                thread.interrupt();
                thread.join(1000L);
                throw th;
            }
        }

        @Test
        public void testSequentialReadingFromBoundedSource() throws Exception {
            UnboundedSourceWrapper unboundedSourceWrapper = new UnboundedSourceWrapper("sequentialRead", FlinkPipelineOptions.defaults(), new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(CountingSource.upTo(1000L)), 4);
            StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
            Mockito.when(Integer.valueOf(streamingRuntimeContext.getIndexOfThisSubtask())).thenReturn(0);
            Mockito.when(Integer.valueOf(streamingRuntimeContext.getNumberOfParallelSubtasks())).thenReturn(2);
            Mockito.when(streamingRuntimeContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
            TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
            testProcessingTimeService.setCurrentTime(0L);
            Mockito.when(streamingRuntimeContext.getProcessingTimeService()).thenReturn(testProcessingTimeService);
            Mockito.when(streamingRuntimeContext.getMetricGroup()).thenReturn(new UnregisteredMetricsGroup());
            unboundedSourceWrapper.setRuntimeContext(streamingRuntimeContext);
            unboundedSourceWrapper.open(new Configuration());
            MatcherAssert.assertThat(Integer.valueOf(unboundedSourceWrapper.getLocalReaders().size()), Is.is(2));
            final ArrayList arrayList = new ArrayList();
            unboundedSourceWrapper.run(new SourceFunction.SourceContext<WindowedValue<ValueWithRecordId<Long>>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapperTest.BasicTest.1
                private final Object checkpointLock = new Object();

                public void collect(WindowedValue<ValueWithRecordId<Long>> windowedValue) {
                    arrayList.add((Long) ((ValueWithRecordId) windowedValue.getValue()).getValue());
                }

                public void collectWithTimestamp(WindowedValue<ValueWithRecordId<Long>> windowedValue, long j) {
                    throw new IllegalStateException("Should not collect with timestamp");
                }

                public void emitWatermark(Watermark watermark) {
                }

                public void markAsTemporarilyIdle() {
                }

                public Object getCheckpointLock() {
                    return this.checkpointLock;
                }

                public void close() {
                }
            });
            MatcherAssert.assertThat(Integer.valueOf(arrayList.size()), Is.is(500));
            MatcherAssert.assertThat(arrayList, Matchers.contains(LongStream.concat(LongStream.range(0L, 250L), LongStream.range(500L, 750L)).boxed().toArray()));
        }

        @Test
        public void testAccumulatorRegistrationOnOperatorClose() throws Exception {
            UnboundedSourceWrapper unboundedSourceWrapper = new UnboundedSourceWrapper("noReader", FlinkPipelineOptions.defaults(), new TestCountingSource(20).withoutSplitting(), 2);
            StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
            Mockito.when(Integer.valueOf(streamingRuntimeContext.getNumberOfParallelSubtasks())).thenReturn(1);
            Mockito.when(streamingRuntimeContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
            Mockito.when(Integer.valueOf(streamingRuntimeContext.getIndexOfThisSubtask())).thenReturn(0);
            unboundedSourceWrapper.setRuntimeContext(streamingRuntimeContext);
            unboundedSourceWrapper.open(new Configuration());
            FlinkMetricContainer flinkMetricContainer = (FlinkMetricContainer) Mockito.spy((FlinkMetricContainer) Whitebox.getInternalState(unboundedSourceWrapper, "metricContainer"));
            Whitebox.setInternalState(unboundedSourceWrapper, "metricContainer", flinkMetricContainer);
            unboundedSourceWrapper.close();
            ((FlinkMetricContainer) Mockito.verify(flinkMetricContainer)).registerMetricsForPipelineResult();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest$IdlingUnboundedSource.class */
    public static class IdlingUnboundedSource<T extends Serializable> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
        private final ConcurrentHashMap<String, Integer> numIdles = new ConcurrentHashMap<>();
        private final String uuid = UUID.randomUUID().toString();
        private final List<T> data;
        private final Coder<T> outputCoder;

        public IdlingUnboundedSource(List<T> list, Coder<T> coder) {
            this.data = list;
            this.outputCoder = coder;
        }

        public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> split(int i, PipelineOptions pipelineOptions) {
            return Collections.singletonList(this);
        }

        public UnboundedSource.UnboundedReader<T> createReader(PipelineOptions pipelineOptions, UnboundedSource.CheckpointMark checkpointMark) {
            return (UnboundedSource.UnboundedReader<T>) new UnboundedSource.UnboundedReader<T>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapperTest.IdlingUnboundedSource.1
                private int currentIdx = -1;
                private boolean lastAdvanced = false;

                public boolean start() {
                    return advance();
                }

                public boolean advance() {
                    if (this.lastAdvanced) {
                        IdlingUnboundedSource.this.numIdles.merge(IdlingUnboundedSource.this.uuid, 1, (v0, v1) -> {
                            return Integer.sum(v0, v1);
                        });
                        this.lastAdvanced = false;
                        return false;
                    }
                    if (this.currentIdx >= IdlingUnboundedSource.this.data.size() - 1) {
                        return false;
                    }
                    this.currentIdx++;
                    this.lastAdvanced = true;
                    return true;
                }

                public Instant getWatermark() {
                    return this.currentIdx >= IdlingUnboundedSource.this.data.size() - 1 ? BoundedWindow.TIMESTAMP_MAX_VALUE : new Instant(this.currentIdx);
                }

                public UnboundedSource.CheckpointMark getCheckpointMark() {
                    return UnboundedSource.CheckpointMark.NOOP_CHECKPOINT_MARK;
                }

                /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
                public UnboundedSource<T, ?> m27getCurrentSource() {
                    return IdlingUnboundedSource.this;
                }

                /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
                public T m28getCurrent() throws NoSuchElementException {
                    if (this.currentIdx < 0 || this.currentIdx >= IdlingUnboundedSource.this.data.size()) {
                        throw new NoSuchElementException();
                    }
                    return (T) IdlingUnboundedSource.this.data.get(this.currentIdx);
                }

                public Instant getCurrentTimestamp() throws NoSuchElementException {
                    if (this.currentIdx < 0 || this.currentIdx >= IdlingUnboundedSource.this.data.size()) {
                        throw new NoSuchElementException();
                    }
                    return new Instant(this.currentIdx);
                }

                public void close() {
                }
            };
        }

        public Coder<UnboundedSource.CheckpointMark> getCheckpointMarkCoder() {
            return null;
        }

        public Coder<T> getOutputCoder() {
            return this.outputCoder;
        }

        int getNumIdles() {
            return this.numIdles.getOrDefault(this.uuid, 0).intValue();
        }
    }

    @RunWith(JUnit4.class)
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest$IntegrationTests.class */
    public static class IntegrationTests {
        @Test(timeout = 30000)
        public void testPollingOfIdleReaders() throws Exception {
            IdlingUnboundedSource idlingUnboundedSource = new IdlingUnboundedSource(Arrays.asList("first", "second", "third"), StringUtf8Coder.of());
            FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
            defaults.setShutdownSourcesAfterIdleMs(0L);
            defaults.setParallelism(4);
            StreamSource streamSource = new StreamSource(new UnboundedSourceWrapper("sequentialRead", defaults, idlingUnboundedSource, 4));
            AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(streamSource, 4, 4, 0);
            abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
            abstractStreamOperatorTestHarness.getExecutionConfig().setAutoWatermarkInterval(10L);
            abstractStreamOperatorTestHarness.open();
            final ArrayList arrayList = new ArrayList();
            Thread startProcessingTimeUpdateThread = UnboundedSourceWrapperTest.startProcessingTimeUpdateThread(abstractStreamOperatorTestHarness);
            StreamSources.run(streamSource, abstractStreamOperatorTestHarness.getCheckpointLock(), new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<String>>>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapperTest.IntegrationTests.1
                public void emitWatermark(Watermark watermark) {
                }

                public void emitLatencyMarker(LatencyMarker latencyMarker) {
                }

                public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
                    throw new IllegalStateException();
                }

                public void collect(StreamRecord<WindowedValue<ValueWithRecordId<String>>> streamRecord) {
                    arrayList.add((String) ((ValueWithRecordId) ((WindowedValue) streamRecord.getValue()).getValue()).getValue());
                }

                public void close() {
                }
            });
            MatcherAssert.assertThat(Integer.valueOf(idlingUnboundedSource.getNumIdles()), Is.is(3));
            MatcherAssert.assertThat(arrayList, Matchers.contains(new String[]{"first", "second", "third"}));
            startProcessingTimeUpdateThread.interrupt();
            startProcessingTimeUpdateThread.join();
        }
    }

    @RunWith(Parameterized.class)
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest$ParameterizedUnboundedSourceWrapperTest.class */
    public static class ParameterizedUnboundedSourceWrapperTest {
        private final int numTasks;
        private final int numSplits;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest$ParameterizedUnboundedSourceWrapperTest$SuccessException.class */
        public static class SuccessException extends RuntimeException {
            private SuccessException() {
            }
        }

        public ParameterizedUnboundedSourceWrapperTest(int i, int i2) {
            this.numTasks = i;
            this.numSplits = i2;
        }

        @Parameterized.Parameters(name = "numTasks = {0}; numSplits={1}")
        public static Collection<Object[]> data() {
            return Arrays.asList(new Object[]{1, 1}, new Object[]{1, 2}, new Object[]{1, 4}, new Object[]{2, 1}, new Object[]{2, 2}, new Object[]{2, 4}, new Object[]{4, 1}, new Object[]{4, 2}, new Object[]{4, 4});
        }

        @Test(timeout = 30000)
        public void testValueEmission() throws Exception {
            FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
            final long[] jArr = {0};
            final int[] iArr = {0};
            TestCountingSource withFixedNumSplits = new TestCountingSource(20).withFixedNumSplits(this.numSplits);
            for (int i = 0; i < this.numTasks; i++) {
                UnboundedSourceWrapper unboundedSourceWrapper = new UnboundedSourceWrapper("stepName", defaults, withFixedNumSplits, this.numTasks);
                Assert.assertEquals(this.numSplits, unboundedSourceWrapper.getSplitSources().size());
                StreamSource streamSource = new StreamSource(unboundedSourceWrapper);
                AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(streamSource, this.numTasks, this.numTasks, i);
                abstractStreamOperatorTestHarness.getExecutionConfig().setAutoWatermarkInterval(10L);
                abstractStreamOperatorTestHarness.setProcessingTime(System.currentTimeMillis());
                abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
                Thread startProcessingTimeUpdateThread = UnboundedSourceWrapperTest.startProcessingTimeUpdateThread(abstractStreamOperatorTestHarness);
                try {
                    abstractStreamOperatorTestHarness.open();
                    StreamSources.run(streamSource, abstractStreamOperatorTestHarness.getCheckpointLock(), new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapperTest.ParameterizedUnboundedSourceWrapperTest.1
                        private boolean hasSeenMaxWatermark = false;

                        public void emitWatermark(Watermark watermark) {
                            if (this.hasSeenMaxWatermark || watermark.getTimestamp() < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
                                return;
                            }
                            int[] iArr2 = iArr;
                            iArr2[0] = iArr2[0] + 1;
                            this.hasSeenMaxWatermark = true;
                        }

                        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
                            collect((StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>) streamRecord);
                        }

                        public void emitLatencyMarker(LatencyMarker latencyMarker) {
                        }

                        public void collect(StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> streamRecord) {
                            long[] jArr2 = jArr;
                            jArr2[0] = jArr2[0] + 1;
                        }

                        public void close() {
                        }
                    });
                    startProcessingTimeUpdateThread.interrupt();
                    startProcessingTimeUpdateThread.join();
                } catch (Throwable th) {
                    startProcessingTimeUpdateThread.interrupt();
                    startProcessingTimeUpdateThread.join();
                    throw th;
                }
            }
            Assert.assertEquals(20 * this.numSplits, jArr[0]);
            Assert.assertEquals(this.numTasks, iArr[0]);
        }

        @Test(timeout = 30000)
        public void testWatermarkEmission() throws Exception {
            PipelineOptions create = PipelineOptionsFactory.create();
            TestCountingSource testCountingSource = new TestCountingSource(500);
            UnboundedSourceWrapper unboundedSourceWrapper = new UnboundedSourceWrapper("stepName", create, testCountingSource, this.numSplits);
            Assert.assertEquals(this.numSplits, unboundedSourceWrapper.getSplitSources().size());
            StreamSource streamSource = new StreamSource(unboundedSourceWrapper);
            AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(streamSource, this.numTasks, this.numTasks, 0);
            abstractStreamOperatorTestHarness.getExecutionConfig().setLatencyTrackingInterval(0L);
            abstractStreamOperatorTestHarness.getExecutionConfig().setAutoWatermarkInterval(1L);
            abstractStreamOperatorTestHarness.setProcessingTime(Long.MIN_VALUE);
            abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            CountDownLatch countDownLatch = new CountDownLatch(2);
            CountDownLatch countDownLatch2 = new CountDownLatch(500 / this.numSplits);
            testCountingSource.haltEmission();
            abstractStreamOperatorTestHarness.open();
            Thread thread = new Thread(() -> {
                try {
                    StreamSources.run(streamSource, abstractStreamOperatorTestHarness.getCheckpointLock(), new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapperTest.ParameterizedUnboundedSourceWrapperTest.2
                        public void emitWatermark(Watermark watermark) {
                            countDownLatch.countDown();
                        }

                        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
                        }

                        public void emitLatencyMarker(LatencyMarker latencyMarker) {
                        }

                        public void collect(StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> streamRecord) {
                            countDownLatch2.countDown();
                        }

                        public void close() {
                        }
                    });
                } catch (Exception e) {
                    UnboundedSourceWrapperTest.LOG.info("Caught exception:", e);
                    concurrentLinkedQueue.add(e);
                }
            });
            thread.start();
            while (unboundedSourceWrapper.getLocalReaders().stream().anyMatch(unboundedReader -> {
                return unboundedReader.getWatermark().getMillis() == 0;
            })) {
                Thread.sleep(50L);
            }
            synchronized (abstractStreamOperatorTestHarness.getCheckpointLock()) {
                abstractStreamOperatorTestHarness.setProcessingTime(0L);
            }
            testCountingSource.continueEmission();
            countDownLatch2.await();
            synchronized (abstractStreamOperatorTestHarness.getCheckpointLock()) {
                abstractStreamOperatorTestHarness.setProcessingTime(Long.MAX_VALUE);
            }
            countDownLatch.await();
            if (!concurrentLinkedQueue.isEmpty()) {
                Assert.fail("Caught exception(s): " + Joiner.on(",").join(concurrentLinkedQueue));
            }
            streamSource.cancel();
            thread.join();
        }

        @Test
        public void testRestore() throws Exception {
            Object obj = new Object();
            PipelineOptions create = PipelineOptionsFactory.create();
            UnboundedSourceWrapper unboundedSourceWrapper = new UnboundedSourceWrapper("stepName", create, new TestCountingSource(20), this.numSplits);
            Assert.assertEquals(this.numSplits, unboundedSourceWrapper.getSplitSources().size());
            StreamSource streamSource = new StreamSource(unboundedSourceWrapper);
            AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(streamSource, this.numTasks, this.numTasks, 0);
            abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
            final HashSet hashSet = new HashSet();
            boolean z = false;
            try {
                abstractStreamOperatorTestHarness.open();
                StreamSources.run(streamSource, obj, new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapperTest.ParameterizedUnboundedSourceWrapperTest.3
                    private int count = 0;

                    public void emitWatermark(Watermark watermark) {
                    }

                    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
                        collect((StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>) streamRecord);
                    }

                    public void emitLatencyMarker(LatencyMarker latencyMarker) {
                    }

                    public void collect(StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> streamRecord) {
                        hashSet.add((KV) ((ValueWithRecordId) ((WindowedValue) streamRecord.getValue()).getValue()).getValue());
                        this.count++;
                        if (this.count >= 10) {
                            throw new SuccessException();
                        }
                    }

                    public void close() {
                    }
                });
            } catch (SuccessException e) {
                z = true;
            }
            Assert.assertTrue("Did not successfully read first batch of elements.", z);
            OperatorSubtaskState snapshot = abstractStreamOperatorTestHarness.snapshot(0L, 0L);
            TestCountingSource.setFinalizeTracker(new ArrayList());
            abstractStreamOperatorTestHarness.notifyOfCompletedCheckpoint(0L);
            Assert.assertEquals(unboundedSourceWrapper.getLocalSplitSources().size(), r0.size());
            UnboundedSourceWrapper unboundedSourceWrapper2 = new UnboundedSourceWrapper("stepName", create, new TestCountingSource(20), this.numSplits);
            Assert.assertEquals(this.numSplits, unboundedSourceWrapper2.getSplitSources().size());
            StreamSource streamSource2 = new StreamSource(unboundedSourceWrapper2);
            AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness2 = new AbstractStreamOperatorTestHarness(streamSource2, this.numTasks, 1, 0);
            abstractStreamOperatorTestHarness2.setTimeCharacteristic(TimeCharacteristic.EventTime);
            abstractStreamOperatorTestHarness2.initializeState(snapshot);
            boolean z2 = false;
            try {
                abstractStreamOperatorTestHarness2.open();
                StreamSources.run(streamSource2, obj, new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapperTest.ParameterizedUnboundedSourceWrapperTest.4
                    private int count = 0;

                    public void emitWatermark(Watermark watermark) {
                    }

                    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
                        collect((StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>) streamRecord);
                    }

                    public void emitLatencyMarker(LatencyMarker latencyMarker) {
                    }

                    public void collect(StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> streamRecord) {
                        hashSet.add((KV) ((ValueWithRecordId) ((WindowedValue) streamRecord.getValue()).getValue()).getValue());
                        this.count++;
                        if (this.count >= 10) {
                            throw new SuccessException();
                        }
                    }

                    public void close() {
                    }
                });
            } catch (SuccessException e2) {
                z2 = true;
            }
            Assert.assertEquals(Math.max(1, this.numSplits / this.numTasks), unboundedSourceWrapper2.getLocalSplitSources().size());
            Assert.assertTrue("Did not successfully read second batch of elements.", z2);
            Assert.assertTrue(hashSet.size() == 20);
        }

        @Test
        public void testNullCheckpoint() throws Exception {
            PipelineOptions create = PipelineOptionsFactory.create();
            AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(new StreamSource(new UnboundedSourceWrapper("stepName", create, new TestCountingSource(20) { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapperTest.ParameterizedUnboundedSourceWrapperTest.5
                @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource
                public Coder<TestCountingSource.CounterMark> getCheckpointMarkCoder() {
                    return null;
                }
            }, this.numSplits)), this.numTasks, this.numTasks, 0);
            abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
            abstractStreamOperatorTestHarness.open();
            OperatorSubtaskState snapshot = abstractStreamOperatorTestHarness.snapshot(0L, 0L);
            AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness2 = new AbstractStreamOperatorTestHarness(new StreamSource(new UnboundedSourceWrapper("stepName", create, new TestCountingSource(20), this.numSplits)), this.numTasks, 1, 0);
            abstractStreamOperatorTestHarness2.setup();
            abstractStreamOperatorTestHarness2.initializeState(snapshot);
            abstractStreamOperatorTestHarness2.open();
            Assert.assertEquals(0L, r0.getLocalSplitSources().size());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest$TestStreamStatusMaintainer.class */
    private static final class TestStreamStatusMaintainer implements StreamStatusMaintainer {
        StreamStatus currentStreamStatus;

        private TestStreamStatusMaintainer() {
            this.currentStreamStatus = StreamStatus.ACTIVE;
        }

        public void toggleStreamStatus(StreamStatus streamStatus) {
            if (this.currentStreamStatus.equals(streamStatus)) {
                return;
            }
            this.currentStreamStatus = streamStatus;
        }

        public StreamStatus getStreamStatus() {
            return this.currentStreamStatus;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Thread startProcessingTimeUpdateThread(final AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness) {
        Thread thread = new Thread() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapperTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        synchronized (abstractStreamOperatorTestHarness.getCheckpointLock()) {
                            abstractStreamOperatorTestHarness.setProcessingTime(System.currentTimeMillis());
                        }
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                        return;
                    } catch (Exception e2) {
                        UnboundedSourceWrapperTest.LOG.error("Unexpected error advancing processing time", e2);
                        return;
                    }
                }
            }
        };
        thread.start();
        return thread;
    }
}
