package org.apache.flink.test.checkpointing;

import java.io.File;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.class */
public class ZooKeeperHighAvailabilityITCase extends TestLogger {
    private static final int NUM_JMS = 1;
    private static final int NUM_TMS = 1;
    private static final int NUM_SLOTS_PER_TM = 1;
    private static File haStorageDir;
    private static TestingServer zkServer;
    private static MiniClusterResource miniClusterResource;
    private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000);

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    private static OneShotLatch failInCheckpointLatch = new OneShotLatch();

    /* loaded from: input_file:org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase$CheckpointBlockingFunction.class */
    private static class CheckpointBlockingFunction extends RichMapFunction<String, String> implements CheckpointedFunction {
        static AtomicInteger allowedInitializeCallsWithoutRestore = new AtomicInteger(1);
        static AtomicInteger illegalRestores = new AtomicInteger(0);
        static AtomicInteger successfulRestores = new AtomicInteger(0);
        static AtomicBoolean afterMessWithZooKeeper = new AtomicBoolean(false);
        static AtomicBoolean failedAlready = new AtomicBoolean(false);
        private final ValueStateDescriptor<String> stateDescriptor;

        private CheckpointBlockingFunction() {
            this.stateDescriptor = new ValueStateDescriptor<>("state", StringSerializer.INSTANCE);
        }

        public String map(String str) throws Exception {
            getRuntimeContext().getState(this.stateDescriptor).update("42");
            return str;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            if (functionSnapshotContext.getCheckpointId() > 5) {
                ZooKeeperHighAvailabilityITCase.waitForCheckpointLatch.trigger();
                ZooKeeperHighAvailabilityITCase.failInCheckpointLatch.await();
                if (!failedAlready.getAndSet(true)) {
                    throw new RuntimeException("Failing on purpose.");
                }
            }
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) {
            if (!functionInitializationContext.isRestored()) {
                if (allowedInitializeCallsWithoutRestore.decrementAndGet() < 0) {
                    illegalRestores.getAndIncrement();
                    throw new RuntimeException("We are not allowed any more restores.");
                }
            } else if (!afterMessWithZooKeeper.get()) {
                illegalRestores.getAndIncrement();
            } else if (successfulRestores.getAndIncrement() > 0) {
                illegalRestores.getAndIncrement();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase$UnboundedSource.class */
    private static class UnboundedSource implements SourceFunction<String> {
        private volatile boolean running;

        private UnboundedSource() {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            while (this.running && !CheckpointBlockingFunction.afterMessWithZooKeeper.get()) {
                sourceContext.collect("hello");
                Thread.sleep(50L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    @BeforeClass
    public static void setup() throws Exception {
        zkServer = new TestingServer();
        Configuration configuration = new Configuration();
        configuration.setInteger("local.number-jobmanager", 1);
        configuration.setInteger("local.number-taskmanager", 1);
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
        haStorageDir = TEMPORARY_FOLDER.newFolder();
        configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
        configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
        configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        miniClusterResource = new MiniClusterResource(new MiniClusterResource.MiniClusterResourceConfiguration(configuration, 1, 1), true);
        miniClusterResource.before();
    }

    @AfterClass
    public static void tearDown() throws Exception {
        miniClusterResource.after();
        zkServer.stop();
        zkServer.close();
    }

    @Test(timeout = 120000)
    public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
        CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
        CheckpointBlockingFunction.successfulRestores.set(0);
        CheckpointBlockingFunction.illegalRestores.set(0);
        CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
        CheckpointBlockingFunction.failedAlready.set(false);
        waitForCheckpointLatch = new OneShotLatch();
        failInCheckpointLatch = new OneShotLatch();
        ClusterClient clusterClient = miniClusterResource.getClusterClient();
        Deadline plus = Deadline.now().plus(TEST_TIMEOUT);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
        executionEnvironment.enableCheckpointing(10L);
        executionEnvironment.setStateBackend(new FsStateBackend(TEMPORARY_FOLDER.newFolder().toURI()));
        executionEnvironment.addSource(new UnboundedSource()).keyBy(str -> {
            return str;
        }).map(new CheckpointBlockingFunction());
        JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
        JobID jobID = (JobID) Preconditions.checkNotNull(jobGraph.getJobID());
        clusterClient.setDetached(true);
        clusterClient.submitJob(jobGraph, ZooKeeperHighAvailabilityITCase.class.getClassLoader());
        waitForCheckpointLatch.await();
        File newFolder = TEMPORARY_FOLDER.newFolder();
        int i = 0;
        File[] listFiles = haStorageDir.listFiles();
        Assert.assertNotNull(listFiles);
        for (File file : listFiles) {
            if (file.getName().startsWith("completedCheckpoint")) {
                Assert.assertTrue(file.renameTo(new File(newFolder, file.getName())));
                i++;
            }
        }
        Assert.assertTrue(i > 0);
        failInCheckpointLatch.trigger();
        Assert.assertEquals(JobStatus.RESTARTING, FutureUtils.retrySuccesfulWithDelay(() -> {
            return clusterClient.getJobStatus(jobID);
        }, Time.milliseconds(1L), plus, jobStatus -> {
            return jobStatus == JobStatus.RESTARTING;
        }, TestingUtils.defaultScheduledExecutor()).get());
        Assert.assertEquals(JobStatus.FAILING, FutureUtils.retrySuccesfulWithDelay(() -> {
            return clusterClient.getJobStatus(jobID);
        }, Time.milliseconds(1L), plus, jobStatus2 -> {
            return jobStatus2 == JobStatus.FAILING;
        }, TestingUtils.defaultScheduledExecutor()).get());
        CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
        File[] listFiles2 = newFolder.listFiles();
        Assert.assertNotNull(listFiles2);
        for (File file2 : listFiles2) {
            if (file2.getName().startsWith("completedCheckpoint")) {
                Assert.assertTrue(file2.renameTo(new File(haStorageDir, file2.getName())));
            }
        }
        Assert.assertEquals(JobStatus.FINISHED, FutureUtils.retrySuccesfulWithDelay(() -> {
            return clusterClient.getJobStatus(jobID);
        }, Time.milliseconds(50L), plus, jobStatus3 -> {
            return jobStatus3 == JobStatus.FINISHED;
        }, TestingUtils.defaultScheduledExecutor()).get());
        Assert.assertThat("We saw illegal restores.", Integer.valueOf(CheckpointBlockingFunction.illegalRestores.get()), Is.is(0));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -859461103:
                if (implMethodName.equals("lambda$testRestoreBehaviourWithFaultyStateHandles$3558be8e$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                    return str -> {
                        return str;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
