package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestBoundedCountingSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestCompat;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.class */
public class FlinkBoundedSourceReaderTest extends FlinkSourceReaderTestBase<WindowedValue<KV<Integer, Integer>>> {
    @Test
    public void testPollWithIdleTimeout() throws Exception {
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        ReaderOutput readerOutput = (ReaderOutput) Mockito.mock(ReaderOutput.class);
        FlinkBoundedSourceReader createReader = createReader(manuallyTriggeredScheduledExecutorService, 1L);
        try {
            createReader.notifyNoMoreSplits();
            Assert.assertEquals(InputStatus.NOTHING_AVAILABLE, createReader.pollNext(readerOutput));
            manuallyTriggeredScheduledExecutorService.triggerScheduledTasks();
            Assert.assertEquals(InputStatus.END_OF_INPUT, createReader.pollNext(readerOutput));
            if (createReader != null) {
                $closeResource(null, createReader);
            }
        } catch (Throwable th) {
            if (createReader != null) {
                $closeResource(null, createReader);
            }
            throw th;
        }
    }

    @Test
    public void testPollWithoutIdleTimeout() throws Exception {
        ReaderOutput readerOutput = (ReaderOutput) Mockito.mock(ReaderOutput.class);
        SourceReader<WindowedValue<KV<Integer, Integer>>, FlinkSourceSplit<KV<Integer, Integer>>> createReader = createReader();
        Throwable th = null;
        try {
            try {
                createReader.notifyNoMoreSplits();
                Assert.assertEquals(InputStatus.END_OF_INPUT, createReader.pollNext(readerOutput));
                if (createReader != null) {
                    $closeResource(null, createReader);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createReader != null) {
                $closeResource(th, createReader);
            }
            throw th3;
        }
    }

    @Test
    public void testIsAvailableOnSplitsAssignment() throws Exception {
        SourceReader<WindowedValue<KV<Integer, Integer>>, FlinkSourceSplit<KV<Integer, Integer>>> createReader = createReader();
        try {
            createReader.start();
            CompletableFuture isAvailable = createReader.isAvailable();
            Assert.assertFalse("No split assigned yet, should not be available.", isAvailable.isDone());
            createReader.addSplits(createSplits(1, 1, 0));
            Assert.assertTrue("Adding a split should complete future1", isAvailable.isDone());
            Assert.assertTrue("Data should be available with a live split.", createReader.isAvailable().isDone());
            if (createReader != null) {
                $closeResource(null, createReader);
            }
        } catch (Throwable th) {
            if (createReader != null) {
                $closeResource(null, createReader);
            }
            throw th;
        }
    }

    @Test
    public void testSnapshotStateAndRestore() throws Exception {
        List<FlinkSourceSplit<KV<Integer, Integer>>> createSplits = createSplits(2, 10, 0);
        FlinkSourceReaderTestBase.RecordsValidatingOutput recordsValidatingOutput = new FlinkSourceReaderTestBase.RecordsValidatingOutput(this, createSplits);
        SourceReader<WindowedValue<KV<Integer, Integer>>, FlinkSourceSplit<KV<Integer, Integer>>> createReader = createReader();
        Throwable th = null;
        try {
            try {
                pollAndValidate(createReader, createSplits, recordsValidatingOutput, 5);
                List<FlinkSourceSplit<KV<Integer, Integer>>> snapshotState = createReader.snapshotState(0L);
                if (createReader != null) {
                    $closeResource(null, createReader);
                }
                FlinkSourceReaderTestBase.RecordsValidatingOutput recordsValidatingOutput2 = new FlinkSourceReaderTestBase.RecordsValidatingOutput(this, createSplits);
                createReader = createReader();
                Throwable th2 = null;
                try {
                    try {
                        pollAndValidate(createReader, snapshotState, recordsValidatingOutput2, Integer.MAX_VALUE);
                        if (createReader != null) {
                            $closeResource(null, createReader);
                        }
                    } catch (Throwable th3) {
                        th2 = th3;
                        throw th3;
                    }
                } finally {
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } finally {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase
    public KV<Integer, Integer> getKVPairs(WindowedValue<KV<Integer, Integer>> windowedValue) {
        return (KV) windowedValue.getValue();
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase
    protected Source<KV<Integer, Integer>> createBeamSource(int i, int i2) {
        return new TestBoundedCountingSource(i, i2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase
    public FlinkBoundedSourceReader<KV<Integer, Integer>> createReader(ScheduledExecutorService scheduledExecutorService, long j, @Nullable Function<WindowedValue<KV<Integer, Integer>>, Long> function, SourceTestCompat.TestMetricGroup testMetricGroup) {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setShutdownSourcesAfterIdleMs(Long.valueOf(j));
        SourceReaderContext createSourceReaderContext = createSourceReaderContext(testMetricGroup);
        return scheduledExecutorService != null ? new FlinkBoundedSourceReader<>(createSourceReaderContext, defaults, scheduledExecutorService, function) : new FlinkBoundedSourceReader<>(createSourceReaderContext, defaults, function);
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
