package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestCompat;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.values.KV;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.metrics.Counter;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.class */
public abstract class FlinkSourceReaderTestBase<OutputT> {

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase$RecordsValidatingOutput.class */
    public class RecordsValidatingOutput implements SourceTestCompat.ReaderOutputCompat<OutputT> {
        private int numCollectedRecords = 0;
        private final List<Source<KV<Integer, Integer>>> sources = new ArrayList();
        private final Map<String, FlinkSourceReaderTestBase<OutputT>.TestSourceOutput> sourceOutputs = new HashMap();

        public RecordsValidatingOutput(List<FlinkSourceSplit<KV<Integer, Integer>>> list) {
            list.forEach(flinkSourceSplit -> {
                this.sources.add(flinkSourceSplit.getBeamSplitSource());
            });
        }

        public void collect(OutputT outputt) {
            KV<Integer, Integer> kVPairs = FlinkSourceReaderTestBase.this.getKVPairs(outputt);
            this.sources.get(((Integer) kVPairs.getKey()).intValue()).validateNextValue(((Integer) kVPairs.getValue()).intValue());
            this.numCollectedRecords++;
        }

        public void collect(OutputT outputt, long j) {
            KV<Integer, Integer> kVPairs = FlinkSourceReaderTestBase.this.getKVPairs(outputt);
            TestSource testSource = this.sources.get(((Integer) kVPairs.getKey()).intValue());
            testSource.validateNextValue(((Integer) kVPairs.getValue()).intValue());
            testSource.validateNextTimestamp(j);
            this.numCollectedRecords++;
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void markIdle() {
        }

        public void markActive() {
        }

        public SourceOutput<OutputT> createOutputForSplit(String str) {
            return this.sourceOutputs.computeIfAbsent(str, str2 -> {
                return new TestSourceOutput(this);
            });
        }

        public void releaseOutputForSplit(String str) {
        }

        public int numCollectedRecords() {
            return this.numCollectedRecords;
        }

        public boolean allRecordsConsumed() {
            Stream<Source<KV<Integer, Integer>>> stream = this.sources.stream();
            Class<TestSource> cls = TestSource.class;
            Objects.requireNonNull(TestSource.class);
            return stream.map((v1) -> {
                return r1.cast(v1);
            }).allMatch((v0) -> {
                return v0.isConsumptionCompleted();
            });
        }

        public boolean allTimestampReceived() {
            boolean z = true;
            Iterator<Source<KV<Integer, Integer>>> it = this.sources.iterator();
            while (it.hasNext()) {
                z = z && ((Source) it.next()).allTimestampsReceived();
            }
            return z;
        }

        public Map<String, FlinkSourceReaderTestBase<OutputT>.TestSourceOutput> createdSourceOutputs() {
            return this.sourceOutputs;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase$TestSourceOutput.class */
    protected class TestSourceOutput implements SourceTestCompat.SourceOutputCompat<OutputT> {
        private final ReaderOutput<OutputT> output;

        @Nullable
        private Watermark watermark;
        private boolean isIdle;

        private TestSourceOutput(FlinkSourceReaderTestBase<OutputT>.RecordsValidatingOutput recordsValidatingOutput) {
            this.output = recordsValidatingOutput;
            this.watermark = null;
            this.isIdle = false;
        }

        public void collect(OutputT outputt) {
            this.output.collect(outputt);
        }

        public void collect(OutputT outputt, long j) {
            this.output.collect(outputt, j);
        }

        public void emitWatermark(Watermark watermark) {
            this.watermark = watermark;
        }

        public void markIdle() {
            this.isIdle = true;
        }

        public void markActive() {
            this.isIdle = false;
        }

        @Nullable
        public Watermark watermark() {
            return this.watermark;
        }

        public boolean isIdle() {
            return this.isIdle;
        }
    }

    @Test(timeout = 30000)
    public void testPollBasic() throws Exception {
        testPoll(5, 10);
    }

    @Test(timeout = 30000)
    public void testPollFromEmptySplit() throws Exception {
        testPoll(3, 0);
    }

    @Test
    public void testPollWithTimestampExtractor() throws Exception {
        testPoll(5, 10, obj -> {
            return Long.valueOf(((Integer) getKVPairs(obj).getValue()).longValue());
        });
    }

    @Test
    public void testExceptionInExecutorThread() throws Exception {
        FlinkSourceReaderBase createReader = createReader();
        try {
            createReader.start();
            ReaderOutput readerOutput = (ReaderOutput) Mockito.mock(ReaderOutput.class);
            createReader.pollNext(readerOutput);
            RuntimeException runtimeException = new RuntimeException();
            RuntimeException runtimeException2 = new RuntimeException();
            createReader.execute(() -> {
                throw runtimeException;
            });
            createReader.execute(() -> {
                throw runtimeException2;
            });
            CountDownLatch countDownLatch = new CountDownLatch(1);
            Objects.requireNonNull(countDownLatch);
            createReader.execute(countDownLatch::countDown);
            countDownLatch.await();
            try {
                createReader.pollNext(readerOutput);
                Assert.fail("Should have thrown exception here.");
            } catch (Exception e) {
                Throwable th = e;
                while (th != runtimeException && th.getCause() != null) {
                    th = th.getCause();
                }
                Assert.assertEquals(runtimeException, th);
                Assert.assertEquals(1L, th.getSuppressed().length);
                Assert.assertEquals(runtimeException2, th.getSuppressed()[0]);
            }
        } finally {
            if (createReader != null) {
                $closeResource(null, createReader);
            }
        }
    }

    private void testPoll(int i, int i2) throws Exception {
        testPoll(i, i2, null);
    }

    private void testPoll(int i, int i2, @Nullable Function<OutputT, Long> function) throws Exception {
        List<FlinkSourceSplit<KV<Integer, Integer>>> createSplits = createSplits(i, i2, 0);
        SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> createReader = createReader(function);
        Throwable th = null;
        try {
            try {
                pollAndValidate(createReader, createSplits, function != null);
                if (createReader != null) {
                    $closeResource(null, createReader);
                }
                verifyBeamReaderClosed(createSplits);
            } finally {
            }
        } catch (Throwable th2) {
            if (createReader != null) {
                $closeResource(th, createReader);
            }
            throw th2;
        }
    }

    @Test
    public void testIsAvailableOnNoMoreSplitsNotification() throws Exception {
        SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> createReader = createReader();
        try {
            createReader.start();
            CompletableFuture isAvailable = createReader.isAvailable();
            Assert.assertFalse("No split assigned yet, should not be available.", isAvailable.isDone());
            createReader.notifyNoMoreSplits();
            Assert.assertTrue("Future1 should be completed upon no more splits notification", isAvailable.isDone());
            Assert.assertTrue("Completed future should be returned so pollNext can be invoked to get updated INPUT_STATUS", createReader.isAvailable().isDone());
            if (createReader != null) {
                $closeResource(null, createReader);
            }
        } catch (Throwable th) {
            if (createReader != null) {
                $closeResource(null, createReader);
            }
            throw th;
        }
    }

    @Test
    public void testIsAvailableWithIdleTimeout() throws Exception {
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> createReader = createReader(manuallyTriggeredScheduledExecutorService, 1L);
        Throwable th = null;
        try {
            try {
                createReader.start();
                CompletableFuture isAvailable = createReader.isAvailable();
                Assert.assertFalse("Future1 should be uncompleted without live split.", isAvailable.isDone());
                createReader.notifyNoMoreSplits();
                Assert.assertTrue("Future1 should be completed upon no more splits notification.", isAvailable.isDone());
                CompletableFuture isAvailable2 = createReader.isAvailable();
                Assert.assertFalse("Future2 should be uncompleted when waiting for idle timeout", isAvailable2.isDone());
                manuallyTriggeredScheduledExecutorService.triggerScheduledTasks();
                Assert.assertTrue("Future2 should be completed after idle timeout.", isAvailable2.isDone());
                Assert.assertTrue("The future should always be completed after idle timeout.", createReader.isAvailable().isDone());
                if (createReader != null) {
                    $closeResource(null, createReader);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createReader != null) {
                $closeResource(th, createReader);
            }
            throw th3;
        }
    }

    @Test
    public void testIsAvailableWithoutIdleTimeout() throws Exception {
        SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> createReader = createReader();
        try {
            createReader.start();
            CompletableFuture isAvailable = createReader.isAvailable();
            Assert.assertFalse("Future1 should be uncompleted without live split.", isAvailable.isDone());
            createReader.notifyNoMoreSplits();
            Assert.assertTrue("Future1 should be completed upon no more splits notification.", isAvailable.isDone());
            Assert.assertTrue("The future should be completed without idle timeout.", createReader.isAvailable().isDone());
            if (createReader != null) {
                $closeResource(null, createReader);
            }
        } catch (Throwable th) {
            if (createReader != null) {
                $closeResource(null, createReader);
            }
            throw th;
        }
    }

    @Test
    public void testNumBytesInMetrics() throws Exception {
        List<FlinkSourceSplit<KV<Integer, Integer>>> createSplits = createSplits(2, 10, 0);
        SourceTestCompat.TestMetricGroup testMetricGroup = new SourceTestCompat.TestMetricGroup();
        SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> createReader = createReader(null, -1L, null, testMetricGroup);
        Throwable th = null;
        try {
            try {
                pollAndValidate(createReader, createSplits, false);
                if (createReader != null) {
                    $closeResource(null, createReader);
                }
                Assert.assertEquals(20L, testMetricGroup.numRecordsInCounter.getCount());
            } finally {
            }
        } catch (Throwable th2) {
            if (createReader != null) {
                $closeResource(th, createReader);
            }
            throw th2;
        }
    }

    @Test
    public void testMetricsContainer() throws Exception {
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        SourceTestCompat.TestMetricGroup testMetricGroup = new SourceTestCompat.TestMetricGroup();
        SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> createReader = createReader(manuallyTriggeredScheduledExecutorService, 0L, null, testMetricGroup);
        Throwable th = null;
        try {
            try {
                createReader.start();
                List<FlinkSourceSplit<KV<Integer, Integer>>> createSplits = createSplits(2, 10, 0);
                createReader.addSplits(createSplits);
                createReader.pollNext(new RecordsValidatingOutput(createSplits));
                Counter counter = testMetricGroup.registeredCounter.get("testNameSpace.advanceCounter");
                Assert.assertNotNull(counter);
                Assert.assertTrue("The reader should have advanced.", counter.getCount() > 0);
                if (createReader != null) {
                    $closeResource(null, createReader);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createReader != null) {
                $closeResource(th, createReader);
            }
            throw th3;
        }
    }

    protected abstract KV<Integer, Integer> getKVPairs(OutputT outputt);

    protected abstract SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> createReader(ScheduledExecutorService scheduledExecutorService, long j, @Nullable Function<OutputT, Long> function, SourceTestCompat.TestMetricGroup testMetricGroup);

    protected abstract Source<KV<Integer, Integer>> createBeamSource(int i, int i2);

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> createReader() {
        return createReader(null, -1L, null, new SourceTestCompat.TestMetricGroup());
    }

    protected SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> createReader(Function<OutputT, Long> function) {
        return createReader(null, -1L, function, new SourceTestCompat.TestMetricGroup());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> createReader(ScheduledExecutorService scheduledExecutorService, long j) {
        return createReader(scheduledExecutorService, j, null, new SourceTestCompat.TestMetricGroup());
    }

    protected SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> createReader(ScheduledExecutorService scheduledExecutorService, long j, Function<OutputT, Long> function) {
        return createReader(scheduledExecutorService, j, function, new SourceTestCompat.TestMetricGroup());
    }

    protected void pollAndValidate(SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> sourceReader, List<FlinkSourceSplit<KV<Integer, Integer>>> list, boolean z) throws Exception {
        FlinkSourceReaderTestBase<OutputT>.RecordsValidatingOutput recordsValidatingOutput = new RecordsValidatingOutput(list);
        pollAndValidate(sourceReader, list, recordsValidatingOutput, Integer.MAX_VALUE);
        if (z) {
            Assert.assertTrue(recordsValidatingOutput.allTimestampReceived());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void pollAndValidate(SourceReader<OutputT, FlinkSourceSplit<KV<Integer, Integer>>> sourceReader, List<FlinkSourceSplit<KV<Integer, Integer>>> list, FlinkSourceReaderTestBase<OutputT>.RecordsValidatingOutput recordsValidatingOutput, int i) throws Exception {
        sourceReader.addSplits(list);
        sourceReader.notifyNoMoreSplits();
        do {
            sourceReader.pollNext(recordsValidatingOutput);
            if (recordsValidatingOutput.allRecordsConsumed()) {
                return;
            }
        } while (recordsValidatingOutput.numCollectedRecords() < i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<FlinkSourceSplit<KV<Integer, Integer>>> createSplits(int i, int i2, int i3) {
        ArrayList arrayList = new ArrayList();
        for (int i4 = i3; i4 < i; i4++) {
            arrayList.add(new FlinkSourceSplit(i4, createBeamSource(i4, i2)));
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> List<FlinkSourceSplit<T>> createEmptySplits(int i) {
        return (List) IntStream.range(0, i).mapToObj(i2 -> {
            return new FlinkSourceSplit(i2, new EmptyUnboundedSource());
        }).collect(Collectors.toList());
    }

    protected void verifyBeamReaderClosed(List<FlinkSourceSplit<KV<Integer, Integer>>> list) {
        list.forEach(flinkSourceSplit -> {
            TestSource beamSplitSource = flinkSourceSplit.getBeamSplitSource();
            Assert.assertEquals("Should have only one beam BoundedReader created", 1L, beamSplitSource.createdReaders().size());
            Assert.assertTrue("The beam BoundedReader should have been closed", beamSplitSource.createdReaders().get(0).isClosed());
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static SourceReaderContext createSourceReaderContext(SourceTestCompat.TestMetricGroup testMetricGroup) {
        SourceReaderContext sourceReaderContext = (SourceReaderContext) Mockito.mock(SourceReaderContext.class);
        Mockito.when(sourceReaderContext.metricGroup()).thenReturn(testMetricGroup);
        return sourceReaderContext;
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
