package org.apache.flink.test.checkpointing;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.runtime.tasks.ExceptionallyDoneFuture;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.class */
public class NotifyCheckpointAbortedITCase extends TestLogger {
    private static final long DECLINE_CHECKPOINT_ID = 2;
    private static final long TEST_TIMEOUT = 100000;
    private static final String DECLINE_SINK_NAME = "DeclineSink";
    private static MiniClusterWithClientResource cluster;
    private static Path checkpointPath;

    @Parameterized.Parameter
    public boolean unalignedCheckpointEnabled;

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase$DeclineSink.class */
    public static class DeclineSink extends StreamSink<Integer> {
        private static final long serialVersionUID = 1;
        private static final OneShotLatch notifiedAbortedLatch = new OneShotLatch();
        private static final OneShotLatch waitLatch = new OneShotLatch();
        private static final AtomicInteger notifiedAbortedTimes = new AtomicInteger(0);

        public DeclineSink() {
            super(new SinkFunction<Integer>() { // from class: org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.DeclineSink.1
                private static final long serialVersionUID = 1;
            });
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
            if (stateSnapshotContext.getCheckpointId() == NotifyCheckpointAbortedITCase.DECLINE_CHECKPOINT_ID) {
                waitLatch.await();
            }
            super.snapshotState(stateSnapshotContext);
        }

        public void notifyCheckpointAborted(long j) {
            notifiedAbortedTimes.incrementAndGet();
            notifiedAbortedLatch.trigger();
        }

        static void reset() {
            notifiedAbortedLatch.reset();
            waitLatch.reset();
            notifiedAbortedTimes.set(0);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase$DeclineSinkFailingOperatorStateBackend.class */
    private static class DeclineSinkFailingOperatorStateBackend extends DefaultOperatorStateBackend {
        public DeclineSinkFailingOperatorStateBackend(ExecutionConfig executionConfig, CloseableRegistry closeableRegistry, AbstractSnapshotStrategy<OperatorStateHandle> abstractSnapshotStrategy) {
            super(executionConfig, closeableRegistry, new HashMap(), new HashMap(), new HashMap(), new HashMap(), abstractSnapshotStrategy);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase$DeclineSinkFailingSnapshotStrategy.class */
    private static class DeclineSinkFailingSnapshotStrategy extends AbstractSnapshotStrategy<OperatorStateHandle> {
        protected DeclineSinkFailingSnapshotStrategy() {
            super("StuckAsyncSnapshotStrategy");
        }

        public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
            return j == NotifyCheckpointAbortedITCase.DECLINE_CHECKPOINT_ID ? ExceptionallyDoneFuture.of(new ExpectedTestException()) : DoneFuture.of(SnapshotResult.empty());
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase$DeclineSinkFailingStateBackend.class */
    private static class DeclineSinkFailingStateBackend extends FsStateBackend {
        private static final long serialVersionUID = 1;

        public DeclineSinkFailingStateBackend(Path path) {
            super(path);
        }

        /* renamed from: configure, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public DeclineSinkFailingStateBackend m744configure(ReadableConfig readableConfig, ClassLoader classLoader) {
            return new DeclineSinkFailingStateBackend(NotifyCheckpointAbortedITCase.checkpointPath);
        }

        public OperatorStateBackend createOperatorStateBackend(Environment environment, String str, @Nonnull Collection<OperatorStateHandle> collection, CloseableRegistry closeableRegistry) throws BackendBuildingException {
            return str.contains(NotifyCheckpointAbortedITCase.DECLINE_SINK_NAME) ? new DeclineSinkFailingOperatorStateBackend(environment.getExecutionConfig(), closeableRegistry, new DeclineSinkFailingSnapshotStrategy()) : new DefaultOperatorStateBackendBuilder(environment.getUserCodeClassLoader().asClassLoader(), environment.getExecutionConfig(), false, collection, closeableRegistry).build();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase$NormalMap.class */
    public static class NormalMap extends StreamMap<Tuple2<Integer, Integer>, Integer> {
        private static final long serialVersionUID = 1;
        private static final OneShotLatch notifiedAbortedLatch = new OneShotLatch();
        private static final AtomicInteger notifiedAbortedTimes = new AtomicInteger(0);

        public NormalMap() {
            super(new NormalMapFunction());
        }

        public void notifyCheckpointAborted(long j) {
            notifiedAbortedTimes.incrementAndGet();
            notifiedAbortedLatch.trigger();
        }

        static void reset() {
            notifiedAbortedLatch.reset();
            notifiedAbortedTimes.set(0);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase$NormalMapFunction.class */
    private static class NormalMapFunction implements MapFunction<Tuple2<Integer, Integer>, Integer>, CheckpointedFunction {
        private static final long serialVersionUID = 1;
        private ValueState<Integer> valueState;

        private NormalMapFunction() {
        }

        public Integer map(Tuple2<Integer, Integer> tuple2) throws Exception {
            this.valueState.update(tuple2.f1);
            return (Integer) tuple2.f1;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.valueState = functionInitializationContext.getKeyedStateStore().getState(new ValueStateDescriptor("value", Integer.class));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase$NormalSource.class */
    private static class NormalSource implements SourceFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1;
        protected volatile boolean running = true;

        NormalSource() {
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> sourceContext) throws Exception {
            while (this.running) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(Tuple2.of(Integer.valueOf(ThreadLocalRandom.current().nextInt()), Integer.valueOf(ThreadLocalRandom.current().nextInt())));
                }
                Thread.sleep(10L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase$TestingCompletedCheckpointStore.class */
    private static class TestingCompletedCheckpointStore extends StandaloneCompletedCheckpointStore {
        private static final OneShotLatch addCheckpointLatch = new OneShotLatch();
        private static final OneShotLatch abortCheckpointLatch = new OneShotLatch();

        TestingCompletedCheckpointStore() {
            super(1);
        }

        public void addCheckpoint(CompletedCheckpoint completedCheckpoint, CheckpointsCleaner checkpointsCleaner, Runnable runnable) throws Exception {
            if (abortCheckpointLatch.isTriggered()) {
                super.addCheckpoint(completedCheckpoint, checkpointsCleaner, runnable);
            } else {
                addCheckpointLatch.trigger();
                abortCheckpointLatch.await();
                throw new ExpectedTestException();
            }
        }

        static void reset() {
            addCheckpointLatch.reset();
            abortCheckpointLatch.reset();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase$TestingHAFactory.class */
    public static class TestingHAFactory implements HighAvailabilityServicesFactory {
        public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) {
            return new TestingHaServices(PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(new TestingCompletedCheckpointStore(), new StandaloneCheckpointIDCounter()), executor);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase$TestingHaServices.class */
    private static class TestingHaServices extends EmbeddedHaServices {
        private final CheckpointRecoveryFactory checkpointRecoveryFactory;

        TestingHaServices(CheckpointRecoveryFactory checkpointRecoveryFactory, Executor executor) {
            super(executor);
            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
        }

        public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
            return this.checkpointRecoveryFactory;
        }
    }

    @Parameterized.Parameters(name = "unalignedCheckpointEnabled ={0}")
    public static Collection<Boolean> parameter() {
        return Arrays.asList(true, false);
    }

    @Before
    public void setup() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
        configuration.setString(HighAvailabilityOptions.HA_MODE, TestingHAFactory.class.getName());
        checkpointPath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
        cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
        cluster.before();
        NormalMap.reset();
        DeclineSink.reset();
        TestingCompletedCheckpointStore.reset();
    }

    @After
    public void shutdown() {
        if (cluster != null) {
            cluster.after();
            cluster = null;
        }
    }

    @Test(timeout = TEST_TIMEOUT)
    public void testNotifyCheckpointAborted() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(200L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().enableUnalignedCheckpoints(this.unalignedCheckpointEnabled);
        executionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
        executionEnvironment.disableOperatorChaining();
        executionEnvironment.setParallelism(1);
        executionEnvironment.setStateBackend(new DeclineSinkFailingStateBackend(checkpointPath));
        executionEnvironment.addSource(new NormalSource()).name("NormalSource").keyBy(tuple2 -> {
            return (Integer) tuple2.f0;
        }).transform("NormalMap", TypeInformation.of(Integer.class), new NormalMap()).transform(DECLINE_SINK_NAME, TypeInformation.of(Object.class), new DeclineSink());
        ClusterClient clusterClient = cluster.getClusterClient();
        JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
        JobID jobID = jobGraph.getJobID();
        clusterClient.submitJob(jobGraph).get();
        TestingCompletedCheckpointStore.addCheckpointLatch.await();
        this.log.info("The checkpoint to abort is ready to add to checkpoint store.");
        TestingCompletedCheckpointStore.abortCheckpointLatch.trigger();
        this.log.info("Verifying whether all operators have been notified of checkpoint-1 aborted.");
        verifyAllOperatorsNotifyAborted();
        this.log.info("Verified that all operators have been notified of checkpoint-1 aborted.");
        resetAllOperatorsNotifyAbortedLatches();
        verifyAllOperatorsNotifyAbortedTimes(1);
        DeclineSink.waitLatch.trigger();
        this.log.info("Verifying whether all operators have been notified of checkpoint-2 aborted.");
        verifyAllOperatorsNotifyAborted();
        this.log.info("Verified that all operators have been notified of checkpoint-2 aborted.");
        verifyAllOperatorsNotifyAbortedTimes(2);
        clusterClient.cancel(jobID).get();
        this.log.info("Test is verified successfully as expected.");
    }

    private void verifyAllOperatorsNotifyAborted() throws InterruptedException {
        NormalMap.notifiedAbortedLatch.await();
        DeclineSink.notifiedAbortedLatch.await();
    }

    private void resetAllOperatorsNotifyAbortedLatches() {
        NormalMap.notifiedAbortedLatch.reset();
        DeclineSink.notifiedAbortedLatch.reset();
    }

    private void verifyAllOperatorsNotifyAbortedTimes(int i) {
        Assert.assertEquals(i, NormalMap.notifiedAbortedTimes.get());
        Assert.assertEquals(i, DeclineSink.notifiedAbortedTimes.get());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -121683145:
                if (implMethodName.equals("lambda$testNotifyCheckpointAborted$3558be8e$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple2 -> {
                        return (Integer) tuple2.f0;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
