package org.apache.flink.streaming.runtime.tasks;

import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.IntStream;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.SourceReaderOperator;
import org.apache.flink.streaming.runtime.io.InputStatus;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceReaderStreamTaskTest.class */
public class SourceReaderStreamTaskTest {
    private static final int TIMEOUT = 30000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceReaderStreamTaskTest$TestingIntegerSourceReaderOperator.class */
    public static class TestingIntegerSourceReaderOperator extends SourceReaderOperator<Integer> {
        private static final long serialVersionUID = 1;
        private final int numRecords;
        private int lastRecord;
        private int counter;
        private ListState<Integer> counterState;

        TestingIntegerSourceReaderOperator(int i) {
            this.numRecords = i;
        }

        public InputStatus emitNext(PushingAsyncDataInput.DataOutput<Integer> dataOutput) throws Exception {
            int i = this.counter;
            this.counter = i + 1;
            dataOutput.emitRecord(new StreamRecord(Integer.valueOf(i)));
            return hasEmittedEverything() ? InputStatus.NOTHING_AVAILABLE : InputStatus.MORE_AVAILABLE;
        }

        private boolean hasEmittedEverything() {
            return this.counter >= this.lastRecord;
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            super.initializeState(stateInitializationContext);
            this.counterState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("counter", IntSerializer.INSTANCE));
            Iterator it = ((Iterable) this.counterState.get()).iterator();
            if (it.hasNext()) {
                this.counter = ((Integer) it.next()).intValue();
            }
            this.lastRecord = this.counter + this.numRecords;
            Preconditions.checkState(!it.hasNext());
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
            super.snapshotState(stateSnapshotContext);
            this.counterState.clear();
            this.counterState.add(Integer.valueOf(this.counter));
        }

        public CompletableFuture<?> getAvailableFuture() {
            return hasEmittedEverything() ? new CompletableFuture<>() : AVAILABLE;
        }

        public void close() {
        }
    }

    @Test
    public void testSnapshotAndRestore() throws Exception {
        executeAndWaitForCheckpoint(10, 2L, IntStream.range(10, 20), Optional.of(executeAndWaitForCheckpoint(10, 1L, IntStream.range(0, 10), Optional.empty())));
    }

    private TaskStateSnapshot executeAndWaitForCheckpoint(int i, long j, IntStream intStream, Optional<TaskStateSnapshot> optional) throws Exception {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        intStream.forEach(i2 -> {
            linkedBlockingQueue.add(new StreamRecord(Integer.valueOf(i2)));
        });
        CheckpointOptions forCheckpointWithDefaultLocation = CheckpointOptions.forCheckpointWithDefaultLocation();
        linkedBlockingQueue.add(new CheckpointBarrier(j, j, forCheckpointWithDefaultLocation));
        StreamTaskTestHarness<Integer> createTestHarness = createTestHarness(i);
        if (optional.isPresent()) {
            createTestHarness.setTaskStateSnapshot(j, optional.get());
        }
        TestTaskStateManager testTaskStateManager = createTestHarness.taskStateManager;
        OneShotLatch oneShotLatch = new OneShotLatch();
        testTaskStateManager.setWaitForReportLatch(oneShotLatch);
        createTestHarness.invoke();
        createTestHarness.waitForTaskRunning();
        StreamTask<Integer, ?> mo102getTask = createTestHarness.mo102getTask();
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(j, j);
        while (createTestHarness.mo102getTask().inputProcessor.getAvailableFuture().isDone()) {
            Thread.sleep(1L);
        }
        mo102getTask.triggerCheckpointAsync(checkpointMetaData, forCheckpointWithDefaultLocation, false).get();
        oneShotLatch.await();
        Assert.assertEquals(j, testTaskStateManager.getReportedCheckpointId());
        createTestHarness.mo102getTask().cancel();
        try {
            createTestHarness.waitForTaskCompletion(30000L);
        } catch (Exception e) {
            if (!ExceptionUtils.findThrowable(e, CancelTaskException.class).isPresent()) {
                throw e;
            }
        }
        TestHarnessUtil.assertOutputEquals("Output was not correct.", linkedBlockingQueue, createTestHarness.getOutput());
        return testTaskStateManager.getLastJobManagerTaskStateSnapshot();
    }

    private static StreamTaskTestHarness<Integer> createTestHarness(int i) {
        StreamTaskTestHarness<Integer> streamTaskTestHarness = new StreamTaskTestHarness<>(SourceReaderStreamTask::new, BasicTypeInfo.INT_TYPE_INFO);
        StreamConfig streamConfig = streamTaskTestHarness.getStreamConfig();
        streamTaskTestHarness.setupOutputForSingletonOperatorChain();
        streamConfig.setStreamOperator(new TestingIntegerSourceReaderOperator(i));
        streamConfig.setOperatorID(new OperatorID(42L, 44L));
        return streamTaskTestHarness;
    }
}
