package org.apache.flink.streaming.runtime.tasks;

import java.io.Serializable;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.SerializedValue;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.class */
public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
    private static final OperatorID OPERATOR_ID = new OperatorID();
    private static final int NUM_RECORDS = 10;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest$TestingExternallyInducedSource.class */
    private static class TestingExternallyInducedSource extends MockSource {
        private static final long serialVersionUID = 3078454109555893721L;
        private final TestingExternallyInducedSourceReader reader;

        private TestingExternallyInducedSource(TestingExternallyInducedSourceReader testingExternallyInducedSourceReader) {
            super(Boundedness.CONTINUOUS_UNBOUNDED, 1);
            this.reader = testingExternallyInducedSourceReader;
        }

        public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext sourceReaderContext) {
            return this.reader;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest$TestingExternallyInducedSourceReader.class */
    public static class TestingExternallyInducedSourceReader implements ExternallyInducedSourceReader<Integer, MockSourceSplit>, Serializable {
        private static final long CHECKPOINT_ID = 1234;
        private final int numEventsBeforeCheckpoint;
        private final int totalNumEvents;
        private int numEmittedEvents = 0;
        private boolean checkpointed = false;
        private int checkpointedAt = -1;
        private long checkpointedId;

        TestingExternallyInducedSourceReader(int i, int i2) {
            this.numEventsBeforeCheckpoint = i;
            this.totalNumEvents = i2;
        }

        public Optional<Long> shouldTriggerCheckpoint() {
            return (this.numEmittedEvents != this.numEventsBeforeCheckpoint || this.checkpointed) ? Optional.empty() : Optional.of(Long.valueOf(CHECKPOINT_ID));
        }

        public void start() {
        }

        public InputStatus pollNext(ReaderOutput<Integer> readerOutput) throws Exception {
            this.numEmittedEvents++;
            return this.numEmittedEvents == this.numEventsBeforeCheckpoint ? InputStatus.NOTHING_AVAILABLE : this.numEmittedEvents < this.totalNumEvents ? InputStatus.MORE_AVAILABLE : InputStatus.END_OF_INPUT;
        }

        public List<MockSourceSplit> snapshotState(long j) {
            this.checkpointed = true;
            this.checkpointedAt = this.numEmittedEvents;
            this.checkpointedId = j;
            return Collections.emptyList();
        }

        public CompletableFuture<Void> isAvailable() {
            return CompletableFuture.completedFuture(null);
        }

        public void addSplits(List<MockSourceSplit> list) {
        }

        public void notifyNoMoreSplits() {
        }

        public void close() throws Exception {
        }
    }

    @Test
    public void testMetrics() throws Exception {
        testMetrics(SourceOperatorStreamTask::new, new SourceOperatorFactory(new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), Matchers.lessThanOrEqualTo(Double.valueOf(1000000.0d)));
    }

    @Test
    public void testSnapshotAndRestore() throws Exception {
        executeAndWaitForCheckpoint(2L, executeAndWaitForCheckpoint(1L, null, IntStream.range(0, NUM_RECORDS)), IntStream.range(NUM_RECORDS, 20));
    }

    @Test
    public void testSnapshotAndAdvanceToEndOfEventTime() throws Exception {
        StreamTaskMailboxTestHarness<Integer> createTestHarness = createTestHarness(1L, null);
        Throwable th = null;
        try {
            try {
                getAndMaybeAssignSplit(createTestHarness);
                CheckpointOptions checkpointOptions = new CheckpointOptions(CheckpointType.SAVEPOINT_TERMINATE, CheckpointStorageLocationReference.getDefault());
                triggerCheckpointWaitForFinish(createTestHarness, 1L, checkpointOptions);
                LinkedList linkedList = new LinkedList();
                linkedList.add(Watermark.MAX_WATERMARK);
                linkedList.add(new CheckpointBarrier(1L, 1L, checkpointOptions));
                TestHarnessUtil.assertOutputEquals("Output was not correct.", linkedList, createTestHarness.getOutput());
                if (createTestHarness != null) {
                    if (0 == 0) {
                        createTestHarness.close();
                        return;
                    }
                    try {
                        createTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestHarness != null) {
                if (th != null) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testEmittingMaxWatermarkAfterReadingAllRecords() throws Exception {
        StreamTaskMailboxTestHarness<Integer> createTestHarness = createTestHarness();
        Throwable th = null;
        try {
            try {
                createTestHarness.processAll();
                createTestHarness.finishProcessing();
                Assert.assertThat(createTestHarness.getOutput().toArray(), CoreMatchers.equalTo(Collections.singletonList(Watermark.MAX_WATERMARK).toArray()));
                if (createTestHarness != null) {
                    if (0 == 0) {
                        createTestHarness.close();
                        return;
                    }
                    try {
                        createTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestHarness != null) {
                if (th != null) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testNotEmittingMaxWatermarkAfterCancelling() throws Exception {
        StreamTaskMailboxTestHarness<Integer> createTestHarness = createTestHarness();
        Throwable th = null;
        try {
            createTestHarness.getStreamTask().cancel();
            createTestHarness.finishProcessing();
            Assert.assertThat(createTestHarness.getOutput(), Matchers.hasSize(0));
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testExternallyInducedSource() throws Exception {
        StreamTaskMailboxTestHarness<Integer> createTestHarness = createTestHarness(new TestingExternallyInducedSource(new TestingExternallyInducedSourceReader(NUM_RECORDS, 20)), 0L, null);
        Throwable th = null;
        try {
            try {
                TestingExternallyInducedSourceReader sourceReader = createTestHarness.getStreamTask().mainOperator.getSourceReader();
                createTestHarness.processAll();
                Assert.assertEquals(20L, sourceReader.numEmittedEvents);
                Assert.assertTrue(sourceReader.checkpointed);
                Assert.assertEquals(1234L, sourceReader.checkpointedId);
                Assert.assertEquals(10L, sourceReader.checkpointedAt);
                if (createTestHarness != null) {
                    if (0 == 0) {
                        createTestHarness.close();
                        return;
                    }
                    try {
                        createTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestHarness != null) {
                if (th != null) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th4;
        }
    }

    private TaskStateSnapshot executeAndWaitForCheckpoint(long j, TaskStateSnapshot taskStateSnapshot, IntStream intStream) throws Exception {
        StreamTaskMailboxTestHarness<Integer> createTestHarness = createTestHarness(j, taskStateSnapshot);
        Throwable th = null;
        try {
            try {
                addRecords(getAndMaybeAssignSplit(createTestHarness), NUM_RECORDS);
                createTestHarness.processAll();
                CheckpointOptions forCheckpointWithDefaultLocation = CheckpointOptions.forCheckpointWithDefaultLocation();
                triggerCheckpointWaitForFinish(createTestHarness, j, forCheckpointWithDefaultLocation);
                LinkedList linkedList = new LinkedList();
                intStream.forEach(i -> {
                    linkedList.offer(new StreamRecord(Integer.valueOf(i), Long.MIN_VALUE));
                });
                linkedList.add(new CheckpointBarrier(j, j, forCheckpointWithDefaultLocation));
                Assert.assertEquals(j, createTestHarness.taskStateManager.getReportedCheckpointId());
                TestHarnessUtil.assertOutputEquals("Output was not correct.", linkedList, createTestHarness.getOutput());
                TaskStateSnapshot lastJobManagerTaskStateSnapshot = createTestHarness.taskStateManager.getLastJobManagerTaskStateSnapshot();
                if (createTestHarness != null) {
                    if (0 != 0) {
                        try {
                            createTestHarness.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTestHarness.close();
                    }
                }
                return lastJobManagerTaskStateSnapshot;
            } finally {
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (th != null) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    private void triggerCheckpointWaitForFinish(StreamTaskMailboxTestHarness<Integer> streamTaskMailboxTestHarness, long j, CheckpointOptions checkpointOptions) throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        streamTaskMailboxTestHarness.taskStateManager.setWaitForReportLatch(oneShotLatch);
        Future triggerCheckpointAsync = streamTaskMailboxTestHarness.getStreamTask().triggerCheckpointAsync(new CheckpointMetaData(j, j), checkpointOptions);
        getSourceReaderFromTask(streamTaskMailboxTestHarness).markAvailable();
        triggerCheckpointAsync.getClass();
        processUntil(streamTaskMailboxTestHarness, triggerCheckpointAsync::isDone);
        Future notifyCheckpointCompleteAsync = streamTaskMailboxTestHarness.getStreamTask().notifyCheckpointCompleteAsync(j);
        notifyCheckpointCompleteAsync.getClass();
        processUntil(streamTaskMailboxTestHarness, notifyCheckpointCompleteAsync::isDone);
        oneShotLatch.await();
    }

    private void processUntil(StreamTaskMailboxTestHarness streamTaskMailboxTestHarness, Supplier<Boolean> supplier) throws Exception {
        do {
            streamTaskMailboxTestHarness.getStreamTask().runMailboxStep();
        } while (!supplier.get().booleanValue());
    }

    private StreamTaskMailboxTestHarness<Integer> createTestHarness() throws Exception {
        return createTestHarness(0L, null);
    }

    private StreamTaskMailboxTestHarness<Integer> createTestHarness(long j, TaskStateSnapshot taskStateSnapshot) throws Exception {
        return createTestHarness(new MockSource(Boundedness.BOUNDED, 1), j, taskStateSnapshot);
    }

    private StreamTaskMailboxTestHarness<Integer> createTestHarness(MockSource mockSource, long j, TaskStateSnapshot taskStateSnapshot) throws Exception {
        StreamOperatorFactory<?> sourceOperatorFactory = new SourceOperatorFactory<>(mockSource, WatermarkStrategy.noWatermarks());
        StreamTaskMailboxTestHarnessBuilder streamTaskMailboxTestHarnessBuilder = new StreamTaskMailboxTestHarnessBuilder(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO);
        if (taskStateSnapshot != null) {
            streamTaskMailboxTestHarnessBuilder.setTaskStateSnapshot(j, taskStateSnapshot);
        }
        return streamTaskMailboxTestHarnessBuilder.setupOutputForSingletonOperatorChain(sourceOperatorFactory, OPERATOR_ID).build();
    }

    private MockSourceSplit getAndMaybeAssignSplit(StreamTaskMailboxTestHarness<Integer> streamTaskMailboxTestHarness) throws Exception {
        List assignedSplits = getSourceReaderFromTask(streamTaskMailboxTestHarness).getAssignedSplits();
        if (assignedSplits.isEmpty()) {
            streamTaskMailboxTestHarness.getStreamTask().dispatchOperatorEvent(OPERATOR_ID, new SerializedValue(new AddSplitEvent(Collections.singletonList(new MockSourceSplit(0, 0)), new MockSourceSplitSerializer())));
            while (assignedSplits.isEmpty()) {
                streamTaskMailboxTestHarness.getStreamTask().runMailboxStep();
            }
            getSourceReaderFromTask(streamTaskMailboxTestHarness).markAvailable();
        }
        return (MockSourceSplit) assignedSplits.get(0);
    }

    private void addRecords(MockSourceSplit mockSourceSplit, int i) {
        int index = mockSourceSplit.index();
        for (int i2 = index; i2 < index + i; i2++) {
            mockSourceSplit.addRecord(i2);
        }
    }

    private MockSourceReader getSourceReaderFromTask(StreamTaskMailboxTestHarness<Integer> streamTaskMailboxTestHarness) {
        return streamTaskMailboxTestHarness.getStreamTask().mainOperator.getSourceReader();
    }
}
