package org.apache.flink.test.checkpointing;

import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SerializableSupplier;
import org.junit.Assume;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointStoreITCase.class */
public class CheckpointStoreITCase extends TestLogger {
    private static final Configuration CONFIGURATION = new Configuration().set(HighAvailabilityOptions.HA_MODE, TestingHAFactory.class.getName());

    @ClassRule
    public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(CONFIGURATION).build());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointStoreITCase$FailingMapper.class */
    public static class FailingMapper implements MapFunction<Integer, Integer> {
        private static volatile boolean failed = false;
        private static volatile boolean failedAndProcessed = false;

        private FailingMapper() {
        }

        public static void reset() {
            failed = false;
            failedAndProcessed = false;
        }

        public Integer map(Integer num) throws Exception {
            if (failed) {
                failedAndProcessed = true;
                return num;
            }
            failed = true;
            throw new RuntimeException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointStoreITCase$FailingStore.class */
    public static class FailingStore implements CompletedCheckpointStore {
        private static volatile boolean started = false;
        private static volatile boolean failed = false;
        private static volatile boolean recovered = false;

        private FailingStore() {
        }

        public static void reset() {
            recovered = false;
            failed = false;
            started = false;
        }

        public void recover() throws Exception {
            if (!started) {
                started = true;
            } else {
                if (!failed) {
                    failed = true;
                    throw new RuntimeException();
                }
                if (recovered) {
                    return;
                }
                recovered = true;
            }
        }

        public void addCheckpoint(CompletedCheckpoint completedCheckpoint, CheckpointsCleaner checkpointsCleaner, Runnable runnable) {
        }

        public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) throws Exception {
        }

        public List<CompletedCheckpoint> getAllCheckpoints() {
            return Collections.emptyList();
        }

        public int getNumberOfRetainedCheckpoints() {
            return 0;
        }

        public int getMaxNumberOfRetainedCheckpoints() {
            return 1;
        }

        public boolean requiresExternalizedCheckpoints() {
            return false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointStoreITCase$TestingHAFactory.class */
    public static class TestingHAFactory implements HighAvailabilityServicesFactory {
        public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) {
            return new EmbeddedHaServices(Executors.directExecutor()) { // from class: org.apache.flink.test.checkpointing.CheckpointStoreITCase.TestingHAFactory.1
                public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
                    return new TestingCheckpointRecoveryFactory(new FailingStore(), new TestingCheckpointIDCounter(new CompletableFuture()));
                }
            };
        }
    }

    @Before
    public void init() {
        FailingStore.reset();
        FailingMapper.reset();
    }

    @Test
    public void testRestartOnRecoveryFailure() throws Exception {
        Assume.assumeFalse("Adaptive scheduler doesn't retry after failures on recovery", ClusterOptions.getSchedulerType(CONFIGURATION) == JobManagerOptions.SchedulerType.Adaptive);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(10L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0L));
        executionEnvironment.addSource(emitUntil(() -> {
            return Boolean.valueOf(FailingStore.recovered && FailingMapper.failedAndProcessed);
        })).map(new FailingMapper()).addSink(new DiscardingSink());
        executionEnvironment.execute();
        Preconditions.checkState(FailingStore.recovered && FailingMapper.failedAndProcessed);
    }

    private SourceFunction<Integer> emitUntil(final SerializableSupplier<Boolean> serializableSupplier) {
        return new SourceFunction<Integer>() { // from class: org.apache.flink.test.checkpointing.CheckpointStoreITCase.1
            private volatile boolean running = true;

            public void run(SourceFunction.SourceContext<Integer> sourceContext) {
                while (this.running && !((Boolean) serializableSupplier.get()).booleanValue()) {
                    synchronized (sourceContext.getCheckpointLock()) {
                        sourceContext.collect(0);
                        try {
                            Thread.sleep(100L);
                        } catch (InterruptedException e) {
                            ExceptionUtils.rethrow(e);
                        }
                    }
                }
            }

            public void cancel() {
                this.running = false;
            }
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -685828783:
                if (implMethodName.equals("lambda$testRestartOnRecoveryFailure$46f190a3$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/CheckpointStoreITCase") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Boolean;")) {
                    return () -> {
                        return Boolean.valueOf(FailingStore.recovered && FailingMapper.failedAndProcessed);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
