package org.apache.flink.test.checkpointing;

import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.class */
public class CheckpointAfterAllTasksFinishedITCase extends AbstractTestBase {
    private static final int SMALL_SOURCE_NUM_RECORDS = 20;
    private static final int BIG_SOURCE_NUM_RECORDS = 100;
    private StreamExecutionEnvironment env;
    private SharedReference<List<Integer>> smallResult;
    private SharedReference<List<Integer>> bigResult;

    @TempDir
    private Path tmpDir;

    @RegisterExtension
    private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase$CollectSink.class */
    public static class CollectSink extends RichSinkFunction<Integer> {
        private final SharedReference<List<Integer>> result;

        public CollectSink(SharedReference<List<Integer>> sharedReference) {
            this.result = sharedReference;
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
        }

        public void invoke(Integer num, SinkFunction.Context context) throws Exception {
            ((List) this.result.get()).add(num);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase$IntegerStreamSource.class */
    public static final class IntegerStreamSource extends RichSourceFunction<Integer> {
        private static final long serialVersionUID = 1;
        private static CountDownLatch latch;
        private static boolean failedBefore;
        private final int numRecords;
        private boolean block;
        private volatile boolean running = true;
        private int emittedCount = 0;
        private boolean needFailover;

        public IntegerStreamSource(int i, boolean z, boolean z2) {
            this.numRecords = i;
            this.block = z;
            this.needFailover = z2;
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            while (this.running && this.emittedCount < this.numRecords) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(Integer.valueOf(this.emittedCount));
                }
                this.emittedCount++;
            }
            if (this.block && latch != null) {
                latch.await();
            }
            if (!this.needFailover || failedBefore) {
                return;
            }
            failedBefore = true;
            throw new RuntimeException("forced failure");
        }

        public void cancel() {
            this.running = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase$PassThroughOperator.class */
    public static class PassThroughOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
        private long checkpointID;

        private PassThroughOperator() {
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
            super.snapshotState(stateSnapshotContext);
            this.checkpointID = stateSnapshotContext.getCheckpointId();
        }

        public void processElement(StreamRecord<Integer> streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }

        public void close() throws Exception {
            super.close();
            Assertions.assertThat(this.checkpointID).isGreaterThan(0L);
        }
    }

    @BeforeEach
    public void setUp() {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.env.setParallelism(4);
        this.smallResult = this.sharedObjects.add(new CopyOnWriteArrayList());
        this.bigResult = this.sharedObjects.add(new CopyOnWriteArrayList());
        boolean unused = IntegerStreamSource.failedBefore = false;
        CountDownLatch unused2 = IntegerStreamSource.latch = new CountDownLatch(1);
    }

    @Test
    public void testImmediateCheckpointing() throws Exception {
        this.env.setRestartStrategy(RestartStrategies.noRestart());
        this.env.enableCheckpointing(Duration.ofNanos(Long.MAX_VALUE).toMillis());
        this.env.execute(getStreamGraph(this.env, false, false));
        Assertions.assertThat(((List) this.smallResult.get()).size()).isEqualTo(SMALL_SOURCE_NUM_RECORDS);
        Assertions.assertThat(((List) this.bigResult.get()).size()).isEqualTo(100);
    }

    @Test
    public void testRestoreAfterSomeTasksFinished() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(4).build());
        Throwable th = null;
        try {
            try {
                miniCluster.start();
                this.env.setRestartStrategy(RestartStrategies.noRestart());
                this.env.enableCheckpointing(100L);
                JobGraph jobGraph = getStreamGraph(this.env, true, false).getJobGraph();
                miniCluster.submitJob(jobGraph).get();
                CommonTestUtils.waitForSubtasksToFinish(miniCluster, jobGraph.getJobID(), findVertexByName(jobGraph, "passA -> Sink: sinkA").getID(), false);
                String str = (String) miniCluster.triggerSavepoint(jobGraph.getJobID(), this.tmpDir.toFile().getAbsolutePath(), true, SavepointFormatType.CANONICAL).get();
                ((List) this.bigResult.get()).clear();
                this.env.enableCheckpointing(Duration.ofNanos(Long.MAX_VALUE).toMillis());
                JobGraph jobGraph2 = getStreamGraph(this.env, true, false).getJobGraph();
                jobGraph2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str, false));
                miniCluster.submitJob(jobGraph2).get();
                IntegerStreamSource.latch.countDown();
                miniCluster.requestJobResult(jobGraph2.getJobID()).get();
                Assertions.assertThat(((List) this.smallResult.get()).size()).isEqualTo(SMALL_SOURCE_NUM_RECORDS);
                Assertions.assertThat(((List) this.bigResult.get()).size()).isEqualTo(100);
                if (miniCluster != null) {
                    if (0 == 0) {
                        miniCluster.close();
                        return;
                    }
                    try {
                        miniCluster.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (miniCluster != null) {
                if (th != null) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testFailoverAfterSomeTasksFinished() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full");
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(4).setConfiguration(configuration).build());
        Throwable th = null;
        try {
            try {
                miniCluster.start();
                this.env.enableCheckpointing(100L);
                JobGraph jobGraph = getStreamGraph(this.env, true, true).getJobGraph();
                miniCluster.submitJob(jobGraph).get();
                CommonTestUtils.waitForSubtasksToFinish(miniCluster, jobGraph.getJobID(), findVertexByName(jobGraph, "passA -> Sink: sinkA").getID(), true);
                ((List) this.bigResult.get()).clear();
                IntegerStreamSource.latch.countDown();
                miniCluster.requestJobResult(jobGraph.getJobID()).get();
                Assertions.assertThat(((List) this.smallResult.get()).size()).isIn(new Object[]{Integer.valueOf(SMALL_SOURCE_NUM_RECORDS), 40});
                Assertions.assertThat(((List) this.bigResult.get()).size()).isEqualTo(100);
                if (miniCluster != null) {
                    if (0 == 0) {
                        miniCluster.close();
                        return;
                    }
                    try {
                        miniCluster.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (miniCluster != null) {
                if (th != null) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th4;
        }
    }

    private StreamGraph getStreamGraph(StreamExecutionEnvironment streamExecutionEnvironment, boolean z, boolean z2) {
        streamExecutionEnvironment.addSource(new IntegerStreamSource(SMALL_SOURCE_NUM_RECORDS, false, false)).transform("passA", Types.INT, new PassThroughOperator()).addSink(new CollectSink(this.smallResult)).name("sinkA");
        streamExecutionEnvironment.addSource(new IntegerStreamSource(100, z, z2)).transform("passB", Types.INT, new PassThroughOperator()).addSink(new CollectSink(this.bigResult)).name("sinkB");
        return streamExecutionEnvironment.getStreamGraph();
    }

    private JobVertex findVertexByName(JobGraph jobGraph, String str) {
        for (JobVertex jobVertex : jobGraph.getVerticesAsArray()) {
            if (jobVertex.getName().equals(str)) {
                return jobVertex;
            }
        }
        return null;
    }
}
