package org.apache.flink.test.checkpointing.utils;

import java.io.File;
import java.io.Serializable;
import java.net.URL;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.class */
public abstract class SnapshotMigrationTestBase extends TestLogger {

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(getConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM).build());
    protected static final int DEFAULT_PARALLELISM = 4;

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotMigrationTestBase.class);

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase$ExecutionMode.class */
    public enum ExecutionMode {
        CREATE_SNAPSHOT,
        VERIFY_SNAPSHOT
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase$SnapshotSpec.class */
    public static class SnapshotSpec implements Serializable {
        private final FlinkVersion flinkVersion;
        private final String stateBackendType;
        private final SnapshotType snapshotType;

        public SnapshotSpec(FlinkVersion flinkVersion, String str, SnapshotType snapshotType) {
            this.flinkVersion = flinkVersion;
            this.stateBackendType = str;
            this.snapshotType = snapshotType;
        }

        public FlinkVersion getFlinkVersion() {
            return this.flinkVersion;
        }

        public String getStateBackendType() {
            return this.stateBackendType;
        }

        public SnapshotType getSnapshotType() {
            return this.snapshotType;
        }

        public static Collection<SnapshotSpec> withVersions(String str, SnapshotType snapshotType, Collection<FlinkVersion> collection) {
            LinkedList linkedList = new LinkedList();
            Iterator<FlinkVersion> it = collection.iterator();
            while (it.hasNext()) {
                linkedList.add(new SnapshotSpec(it.next(), str, snapshotType));
            }
            return linkedList;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("flink" + this.flinkVersion);
            String str = this.stateBackendType;
            boolean z = -1;
            switch (str.hashCode()) {
                case 697541006:
                    if (str.equals("hashmap")) {
                        z = 2;
                        break;
                    }
                    break;
                case 1368770220:
                    if (str.equals("rocksdb")) {
                        z = false;
                        break;
                    }
                    break;
                case 1712403792:
                    if (str.equals("jobmanager")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    sb.append("-rocksdb");
                    break;
                case true:
                    break;
                case true:
                    sb.append("-hashmap");
                    break;
                default:
                    throw new UnsupportedOperationException("State backend type not supported.");
            }
            switch (this.snapshotType) {
                case SAVEPOINT_CANONICAL:
                    sb.append("-savepoint");
                    break;
                case SAVEPOINT_NATIVE:
                    sb.append("-savepoint-native");
                    break;
                case CHECKPOINT:
                    sb.append("-checkpoint");
                    break;
                default:
                    throw new UnsupportedOperationException("Snapshot type not supported.");
            }
            return sb.toString();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase$SnapshotType.class */
    public enum SnapshotType {
        SAVEPOINT_CANONICAL,
        SAVEPOINT_NATIVE,
        CHECKPOINT
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String getResourceFilename(String str) {
        URL resource = SnapshotMigrationTestBase.class.getClassLoader().getResource(str);
        if (resource == null) {
            throw new NullPointerException("Missing snapshot resource.");
        }
        return resource.getFile();
    }

    private Configuration getConfiguration() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("local.number-taskmanager", 1);
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
        UUID randomUUID = UUID.randomUUID();
        File absoluteFile = TEMP_FOLDER.newFolder("checkpoints_" + randomUUID).getAbsoluteFile();
        File absoluteFile2 = TEMP_FOLDER.newFolder("savepoints_" + randomUUID).getAbsoluteFile();
        if (!absoluteFile.exists() || !absoluteFile2.exists()) {
            throw new Exception("Test setup failed: failed to create (temporary) directories.");
        }
        LOG.info("Created temporary checkpoint directory: " + absoluteFile + ".");
        LOG.info("Created savepoint directory: " + absoluteFile2 + ".");
        configuration.setString(StateBackendOptions.STATE_BACKEND, "memory");
        configuration.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, absoluteFile.toURI().toString());
        configuration.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
        configuration.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, absoluteFile2.toURI().toString());
        configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 300L);
        return configuration;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Failed to find 'out' block for switch in B:25:0x00e9. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:29:0x0175  */
    /* JADX WARN: Removed duplicated region for block: B:32:0x0185  */
    @java.lang.SafeVarargs
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public final void executeAndSnapshot(org.apache.flink.streaming.api.environment.StreamExecutionEnvironment r7, java.lang.String r8, org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.SnapshotType r9, org.apache.flink.api.java.tuple.Tuple2<java.lang.String, java.lang.Integer>... r10) throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 403
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.executeAndSnapshot(org.apache.flink.streaming.api.environment.StreamExecutionEnvironment, java.lang.String, org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase$SnapshotType, org.apache.flink.api.java.tuple.Tuple2[]):void");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @SafeVarargs
    public final void restoreAndExecute(StreamExecutionEnvironment streamExecutionEnvironment, String str, Tuple2<String, Integer>... tuple2Arr) throws Exception {
        Deadline fromNow = Deadline.fromNow(Duration.ofMinutes(5L));
        ClusterClient clusterClient = this.miniClusterResource.getClusterClient();
        JobGraph jobGraph = streamExecutionEnvironment.getStreamGraph().getJobGraph();
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str));
        JobID jobID = (JobID) clusterClient.submitJob(jobGraph).get();
        boolean z = false;
        while (true) {
            if (!fromNow.hasTimeLeft()) {
                break;
            }
            try {
                JobStatus jobStatus = (JobStatus) clusterClient.getJobStatus(jobID).get(5L, TimeUnit.SECONDS);
                if (jobStatus == JobStatus.FAILED) {
                    LOG.warn("Job reached status failed", ((SerializedThrowable) ((JobResult) clusterClient.requestJobResult(jobID).get()).getSerializedThrowable().get()).deserializeError(ClassLoader.getSystemClassLoader()));
                }
                Assert.assertNotEquals(JobStatus.FAILED, jobStatus);
            } catch (Exception e) {
                Assert.fail("Could not connect to job: " + e);
            }
            Thread.sleep(100L);
            Map map = (Map) clusterClient.getAccumulators(jobID).get();
            boolean z2 = true;
            int length = tuple2Arr.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                Tuple2<String, Integer> tuple2 = tuple2Arr[i];
                Object obj = map.get(tuple2.f0);
                if (obj == null) {
                    z2 = false;
                    break;
                } else {
                    if (!obj.equals(tuple2.f1)) {
                        z2 = false;
                        break;
                    }
                    i++;
                }
            }
            if (z2) {
                z = true;
                break;
            }
        }
        if (z) {
            return;
        }
        Assert.fail("Did not see the expected accumulator results within time limit.");
    }
}
