package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestCompat;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.metrics.Gauge;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.class */
public class FlinkUnboundedSourceReaderTest extends FlinkSourceReaderTestBase<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> {

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest$DummySource.class */
    private static class DummySource extends TestCountingSource {

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest$DummySource$DummySourceReader.class */
        public class DummySourceReader extends TestCountingSource.CountingSourceReader {
            private final Random random;

            public DummySourceReader() {
                super(0);
                this.random = new Random();
            }

            @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource.CountingSourceReader
            public boolean advance() {
                if (this.random.nextInt(3) == 0) {
                    return super.advance();
                }
                return false;
            }
        }

        public DummySource(int i) {
            super(i);
        }

        @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource
        public TestCountingSource.CountingSourceReader createReader(PipelineOptions pipelineOptions, TestCountingSource.CounterMark counterMark) {
            DummySourceReader dummySourceReader = new DummySourceReader();
            createdReaders().add(dummySourceReader);
            return dummySourceReader;
        }
    }

    @Test
    public void testSnapshotStateAndRestore() throws Exception {
        List<FlinkSourceSplit<KV<Integer, Integer>>> createSplits = createSplits(2, 10, 0);
        FlinkSourceReaderTestBase.RecordsValidatingOutput recordsValidatingOutput = new FlinkSourceReaderTestBase.RecordsValidatingOutput(this, createSplits);
        SourceReader<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>, FlinkSourceSplit<KV<Integer, Integer>>> createReader = createReader();
        Throwable th = null;
        try {
            try {
                pollAndValidate(createReader, createSplits, recordsValidatingOutput, 10);
                List<FlinkSourceSplit<KV<Integer, Integer>>> snapshotState = createReader.snapshotState(0L);
                if (createReader != null) {
                    $closeResource(null, createReader);
                }
                createReader = createReader();
                Throwable th2 = null;
                try {
                    try {
                        pollAndValidate(createReader, snapshotState, recordsValidatingOutput, Integer.MAX_VALUE);
                        if (createReader != null) {
                            $closeResource(null, createReader);
                        }
                    } catch (Throwable th3) {
                        th2 = th3;
                        throw th3;
                    }
                } finally {
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } finally {
        }
    }

    @Test(timeout = 30000)
    public void testIsAvailableAlwaysWakenUp() throws Exception {
        ArrayList arrayList = new ArrayList();
        AtomicReference atomicReference = new AtomicReference();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new FlinkSourceSplit(0, new DummySource(Integer.MAX_VALUE)));
        FlinkSourceReaderTestBase.RecordsValidatingOutput recordsValidatingOutput = new FlinkSourceReaderTestBase.RecordsValidatingOutput(this, arrayList2);
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        SourceReader<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>, FlinkSourceSplit<KV<Integer, Integer>>> createReader = createReader(manuallyTriggeredScheduledExecutorService, Long.MAX_VALUE);
        Throwable th = null;
        try {
            try {
                createReader.start();
                createReader.addSplits(arrayList2);
                Thread thread = new Thread(() -> {
                    while (arrayList.size() < 1000000) {
                        try {
                            if (createReader.pollNext(recordsValidatingOutput) == InputStatus.NOTHING_AVAILABLE) {
                                CompletableFuture isAvailable = createReader.isAvailable();
                                isAvailable.get();
                                arrayList.add(isAvailable);
                            }
                        } catch (Exception e) {
                            if (atomicReference.compareAndSet(null, e)) {
                                return;
                            }
                            ((Exception) atomicReference.get()).addSuppressed(e);
                            return;
                        }
                    }
                }, "MainThread");
                Thread thread2 = new Thread(() -> {
                    while (arrayList.size() < 1000000) {
                        manuallyTriggeredScheduledExecutorService.triggerScheduledTasks();
                    }
                }, "ExecutorThread");
                thread.start();
                thread2.start();
                thread2.join();
                if (createReader != null) {
                    $closeResource(null, createReader);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createReader != null) {
                $closeResource(th, createReader);
            }
            throw th3;
        }
    }

    @Test
    public void testIsAvailableOnSplitChangeWhenNoDataAvailableForAliveReaders() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        arrayList.add(new FlinkSourceSplit(0, new DummySource(0)));
        arrayList2.add(new FlinkSourceSplit(1, new DummySource(0)));
        FlinkSourceReaderTestBase.RecordsValidatingOutput recordsValidatingOutput = new FlinkSourceReaderTestBase.RecordsValidatingOutput(this, arrayList);
        SourceReader<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>, FlinkSourceSplit<KV<Integer, Integer>>> createReader = createReader(new ManuallyTriggeredScheduledExecutorService(), Long.MAX_VALUE);
        Throwable th = null;
        try {
            try {
                createReader.start();
                createReader.addSplits(arrayList);
                Assert.assertEquals("The reader should have nothing available", InputStatus.NOTHING_AVAILABLE, createReader.pollNext(recordsValidatingOutput));
                CompletableFuture isAvailable = createReader.isAvailable();
                Assert.assertFalse("Future1 should be uncompleted without live split.", isAvailable.isDone());
                createReader.addSplits(arrayList2);
                Assert.assertTrue("Future1 should be completed upon addition of new splits.", isAvailable.isDone());
                CompletableFuture isAvailable2 = createReader.isAvailable();
                Assert.assertFalse("Future2 should be uncompleted without live split.", isAvailable2.isDone());
                createReader.notifyNoMoreSplits();
                Assert.assertTrue("Future2 should be completed upon NoMoreSplitsNotification.", isAvailable2.isDone());
                if (createReader != null) {
                    $closeResource(null, createReader);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createReader != null) {
                $closeResource(th, createReader);
            }
            throw th3;
        }
    }

    @Test
    public void testWatermark() throws Exception {
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        FlinkUnboundedSourceReader createReader = createReader(manuallyTriggeredScheduledExecutorService, -1L);
        Throwable th = null;
        try {
            try {
                List<FlinkSourceSplit<KV<Integer, Integer>>> createSplits = createSplits(2, 10, 0);
                FlinkSourceReaderTestBase.RecordsValidatingOutput recordsValidatingOutput = new FlinkSourceReaderTestBase.RecordsValidatingOutput(this, createSplits);
                createReader.start();
                createReader.addSplits(createSplits);
                for (int i = 0; i < 5; i++) {
                    createReader.pollNext(recordsValidatingOutput);
                }
                Map createdSourceOutputs = recordsValidatingOutput.createdSourceOutputs();
                Assert.assertEquals("There should be 2 source outputs created.", 2L, createdSourceOutputs.size());
                Assert.assertNull(((FlinkSourceReaderTestBase.TestSourceOutput) createdSourceOutputs.get("0")).watermark());
                Assert.assertNull(((FlinkSourceReaderTestBase.TestSourceOutput) createdSourceOutputs.get("1")).watermark());
                manuallyTriggeredScheduledExecutorService.triggerScheduledTasks();
                createReader.pollNext(recordsValidatingOutput);
                Assert.assertEquals(3L, ((FlinkSourceReaderTestBase.TestSourceOutput) createdSourceOutputs.get("0")).watermark().getTimestamp());
                Assert.assertEquals(2L, ((FlinkSourceReaderTestBase.TestSourceOutput) createdSourceOutputs.get("1")).watermark().getTimestamp());
                createReader.pollNext(recordsValidatingOutput);
                Assert.assertEquals(3L, ((FlinkSourceReaderTestBase.TestSourceOutput) createdSourceOutputs.get("0")).watermark().getTimestamp());
                Assert.assertEquals(2L, ((FlinkSourceReaderTestBase.TestSourceOutput) createdSourceOutputs.get("1")).watermark().getTimestamp());
                manuallyTriggeredScheduledExecutorService.triggerScheduledTasks();
                createReader.pollNext(recordsValidatingOutput);
                Assert.assertEquals(4L, ((FlinkSourceReaderTestBase.TestSourceOutput) createdSourceOutputs.get("0")).watermark().getTimestamp());
                Assert.assertEquals(3L, ((FlinkSourceReaderTestBase.TestSourceOutput) createdSourceOutputs.get("1")).watermark().getTimestamp());
                if (createReader != null) {
                    $closeResource(null, createReader);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createReader != null) {
                $closeResource(th, createReader);
            }
            throw th3;
        }
    }

    @Test
    public void testPendingBytesMetric() throws Exception {
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        SourceTestCompat.TestMetricGroup testMetricGroup = new SourceTestCompat.TestMetricGroup();
        FlinkUnboundedSourceReader<KV<Integer, Integer>> createReader = createReader((ScheduledExecutorService) manuallyTriggeredScheduledExecutorService, 0L, (Function<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>, Long>) null, testMetricGroup);
        Throwable th = null;
        try {
            try {
                createReader.start();
                List<FlinkSourceSplit<KV<Integer, Integer>>> createSplits = createSplits(2, 10, 0);
                createReader.addSplits(createSplits);
                createReader.pollNext(new FlinkSourceReaderTestBase.RecordsValidatingOutput(this, createSplits));
                Gauge<?> gauge = testMetricGroup.registeredGauge.get("pendingBytes");
                Assert.assertNotNull(gauge);
                Assert.assertEquals(14L, ((Long) gauge.getValue()).longValue());
                if (createReader != null) {
                    $closeResource(null, createReader);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createReader != null) {
                $closeResource(th, createReader);
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase
    public KV<Integer, Integer> getKVPairs(WindowedValue<ValueWithRecordId<KV<Integer, Integer>>> windowedValue) {
        return (KV) ((ValueWithRecordId) windowedValue.getValue()).getValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase
    public FlinkUnboundedSourceReader<KV<Integer, Integer>> createReader(ScheduledExecutorService scheduledExecutorService, long j, Function<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>, Long> function, SourceTestCompat.TestMetricGroup testMetricGroup) {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setShutdownSourcesAfterIdleMs(Long.valueOf(j));
        defaults.setAutoWatermarkInterval(10L);
        SourceReaderContext createSourceReaderContext = createSourceReaderContext(testMetricGroup);
        return scheduledExecutorService != null ? new FlinkUnboundedSourceReader<>("FlinkUnboundedReader", createSourceReaderContext, defaults, scheduledExecutorService, function) : new FlinkUnboundedSourceReader<>("FlinkUnboundedReader", createSourceReaderContext, defaults, function);
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase
    protected Source<KV<Integer, Integer>> createBeamSource(int i, int i2) {
        return new TestCountingSource(i2).withShardNumber(i).withFixedNumSplits(1);
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
