package org.apache.flink.test.checkpointing;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointRestoreWithUidHashITCase.class */
public class CheckpointRestoreWithUidHashITCase {

    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();

    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    @Rule
    public final MiniClusterResource miniClusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
    private SharedReference<CountDownLatch> startWaitingForCheckpointLatch;
    private SharedReference<List<Integer>> result;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointRestoreWithUidHashITCase$CollectSink.class */
    public static class CollectSink implements SinkFunction<Integer> {
        private final SharedReference<List<Integer>> result;

        public CollectSink(SharedReference<List<Integer>> sharedReference) {
            this.result = sharedReference;
        }

        public void invoke(Integer num, SinkFunction.Context context) throws Exception {
            ((List) this.result.get()).add(num);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointRestoreWithUidHashITCase$StatefulSource.class */
    public static class StatefulSource extends RichSourceFunction<Integer> implements CheckpointedFunction, CheckpointListener {
        private final StatefulSourceBehavior behavior;
        private final int maxNumber;
        private final SharedReference<CountDownLatch> startWaitingForCheckpointLatch;
        private ListState<Integer> nextNumberState;
        private int nextNumber;
        private volatile boolean isCanceled;
        private volatile boolean isWaiting;
        private volatile long firstCheckpointIdAfterWaiting;
        private volatile boolean checkpointCompletedAfterWaiting;

        public StatefulSource(StatefulSourceBehavior statefulSourceBehavior, int i, SharedReference<CountDownLatch> sharedReference) {
            this.behavior = statefulSourceBehavior;
            this.maxNumber = i;
            this.startWaitingForCheckpointLatch = sharedReference;
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.nextNumberState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("next", Integer.class));
            if (((Iterable) this.nextNumberState.get()).iterator().hasNext()) {
                this.nextNumber = ((Integer) ((Iterable) this.nextNumberState.get()).iterator().next()).intValue();
            }
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            emitRecordsTill(this.maxNumber / 3, sourceContext);
            if (!this.behavior.waitForCheckpointOnFirstRun || getRuntimeContext().getTaskInfo().getAttemptNumber() != 0) {
                emitRecordsTill(this.maxNumber, sourceContext);
                return;
            }
            this.isWaiting = true;
            ((CountDownLatch) this.startWaitingForCheckpointLatch.get()).countDown();
            while (!this.checkpointCompletedAfterWaiting) {
                Thread.sleep(200L);
            }
            if (this.behavior == StatefulSourceBehavior.FAIL_AFTER_CHECKPOINT_ON_FIRST_RUN) {
                throw new RuntimeException("Artificial Exception");
            }
            if (this.behavior == StatefulSourceBehavior.HOLD_AFTER_CHECKPOINT_ON_FIRST_RUN) {
                while (!this.isCanceled) {
                    Thread.sleep(200L);
                }
            }
        }

        private void emitRecordsTill(int i, SourceFunction.SourceContext<Integer> sourceContext) {
            while (!this.isCanceled && this.nextNumber < i) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(Integer.valueOf(this.nextNumber));
                    this.nextNumber++;
                }
            }
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.nextNumberState.update(Collections.singletonList(Integer.valueOf(this.nextNumber)));
            if (!this.isWaiting || this.firstCheckpointIdAfterWaiting > 0) {
                return;
            }
            this.firstCheckpointIdAfterWaiting = functionSnapshotContext.getCheckpointId();
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (this.firstCheckpointIdAfterWaiting <= 0 || j < this.firstCheckpointIdAfterWaiting) {
                return;
            }
            this.checkpointCompletedAfterWaiting = true;
        }

        public void cancel() {
            this.isCanceled = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointRestoreWithUidHashITCase$StatefulSourceBehavior.class */
    public enum StatefulSourceBehavior {
        PROCESS_ONLY(false),
        HOLD_AFTER_CHECKPOINT_ON_FIRST_RUN(true),
        FAIL_AFTER_CHECKPOINT_ON_FIRST_RUN(true);

        boolean waitForCheckpointOnFirstRun;

        StatefulSourceBehavior(boolean z) {
            this.waitForCheckpointOnFirstRun = z;
        }
    }

    @Before
    public void setup() {
        this.startWaitingForCheckpointLatch = this.sharedObjects.add(new CountDownLatch(1));
        this.result = this.sharedObjects.add(new ArrayList());
    }

    @Test
    public void testRestoreFromSavepointBySetUidHash() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        JobGraph createJobGraph = createJobGraph(executionEnvironment, StatefulSourceBehavior.HOLD_AFTER_CHECKPOINT_ON_FIRST_RUN, 100, "test-uid", null, null);
        JobID jobID = ((JobSubmissionResult) this.miniClusterResource.getMiniCluster().submitJob(createJobGraph).get()).getJobID();
        CommonTestUtils.waitForAllTaskRunning(this.miniClusterResource.getMiniCluster(), jobID, false);
        ((CountDownLatch) this.startWaitingForCheckpointLatch.get()).await();
        String str = (String) this.miniClusterResource.getMiniCluster().triggerSavepoint(jobID, TMP_FOLDER.newFolder().getAbsolutePath(), true, SavepointFormatType.CANONICAL).get();
        List operatorIDs = ((JobVertex) createJobGraph.getVerticesSortedTopologicallyFromSources().get(0)).getOperatorIDs();
        this.miniClusterResource.getMiniCluster().executeJobBlocking(createJobGraph(executionEnvironment, StatefulSourceBehavior.PROCESS_ONLY, 100, null, ((OperatorIDPair) operatorIDs.get(operatorIDs.size() - 1)).getGeneratedOperatorID().toHexString(), str));
        MatcherAssert.assertThat(this.result.get(), Matchers.contains(IntStream.range(0, 100).boxed().toArray()));
    }

    @Test
    public void testRestoreCheckpointAfterFailoverWithUidHashSet() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 500L));
        executionEnvironment.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
        this.miniClusterResource.getMiniCluster().executeJobBlocking(createJobGraph(executionEnvironment, StatefulSourceBehavior.FAIL_AFTER_CHECKPOINT_ON_FIRST_RUN, 100, null, new OperatorID().toHexString(), null));
        MatcherAssert.assertThat(this.result.get(), Matchers.contains(IntStream.range(0, 100).boxed().toArray()));
    }

    private JobGraph createJobGraph(StreamExecutionEnvironment streamExecutionEnvironment, StatefulSourceBehavior statefulSourceBehavior, int i, @Nullable String str, @Nullable String str2, @Nullable String str3) {
        SingleOutputStreamOperator parallelism = streamExecutionEnvironment.addSource(new StatefulSource(statefulSourceBehavior, i, this.startWaitingForCheckpointLatch)).setParallelism(1);
        if (str != null) {
            parallelism = parallelism.uid(str);
        }
        if (str2 != null) {
            parallelism = parallelism.setUidHash(str2);
        }
        parallelism.addSink(new CollectSink(this.result)).setParallelism(1);
        JobGraph jobGraph = streamExecutionEnvironment.getStreamGraph().getJobGraph();
        if (str3 != null) {
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str3, false));
        }
        return jobGraph;
    }
}
