package org.apache.flink.test.scheduling;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/test/scheduling/JMFailoverITCase.class */
public class JMFailoverITCase {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    private static final int DEFAULT_MAX_PARALLELISM = 4;
    private static final int SOURCE_PARALLELISM = 8;
    private static final int NUMBER_KEYS = 10000;
    private static final int NUMBER_OF_EACH_KEY = 4;
    private EmbeddedHaServicesWithLeadershipControl highAvailabilityServices;
    private String methodName;

    @TempDir
    Path temporaryFolder;
    protected MiniCluster flinkCluster;
    private final Duration previousWorkerRecoveryTimeout = Duration.ofSeconds(3);
    protected int numTaskManagers = 4;
    protected int numSlotsPerTaskManager = 4;
    protected Configuration flinkConfiguration = new Configuration();
    protected Supplier<HighAvailabilityServices> highAvailabilityServicesSupplier = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/scheduling/JMFailoverITCase$SourceTail.class */
    public static class SourceTail extends AbstractStreamOperator<Long> implements OneInputStreamOperator<Long, Long> {
        public static Map<Integer, Boolean> subtaskBlocked = new ConcurrentHashMap();
        public static Map<Integer, ResultPartitionID> resultPartitions = new ConcurrentHashMap();
        public static Map<Integer, Integer> attemptIds = new ConcurrentHashMap();

        public SourceTail() {
            setChainingStrategy(ChainingStrategy.ALWAYS);
        }

        public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Long>> output) {
            super.setup(streamTask, streamConfig, output);
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            attemptIds.compute(Integer.valueOf(indexOfThisSubtask), (num, num2) -> {
                return num2 == null ? 0 : Integer.valueOf(num2.intValue() + 1);
            });
            Environment environment = getContainingTask().getEnvironment();
            Preconditions.checkState(environment.getAllWriters().length == 1);
            resultPartitions.put(Integer.valueOf(indexOfThisSubtask), environment.getAllWriters()[0].getPartitionId());
            if (subtaskBlocked.containsKey(Integer.valueOf(indexOfThisSubtask)) && subtaskBlocked.get(Integer.valueOf(indexOfThisSubtask)).booleanValue()) {
                JMFailoverITCase.tryWaitUntilCondition(() -> {
                    return Boolean.valueOf(!subtaskBlocked.get(Integer.valueOf(indexOfThisSubtask)).booleanValue());
                });
            }
        }

        public void processElement(StreamRecord<Long> streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }

        public static void clear() {
            subtaskBlocked.clear();
            attemptIds.clear();
            resultPartitions.clear();
        }

        public static void blockSubTasks(Integer... numArr) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(numArr), true, subtaskBlocked);
        }

        public static void unblockSubTasks(Integer... numArr) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(numArr), false, subtaskBlocked);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/scheduling/JMFailoverITCase$StubMapFunction.class */
    public static class StubMapFunction extends AbstractStreamOperator<Tuple2<Integer, Integer>> implements OneInputStreamOperator<Long, Tuple2<Integer, Integer>> {
        public static Map<Integer, Boolean> subtaskBlocked = new ConcurrentHashMap();
        public static Map<Integer, Integer> attemptIds = new ConcurrentHashMap();

        private StubMapFunction() {
        }

        public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Tuple2<Integer, Integer>>> output) {
            super.setup(streamTask, streamConfig, output);
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            attemptIds.compute(Integer.valueOf(indexOfThisSubtask), (num, num2) -> {
                return num2 == null ? 0 : Integer.valueOf(num2.intValue() + 1);
            });
            if (subtaskBlocked.containsKey(Integer.valueOf(indexOfThisSubtask)) && subtaskBlocked.get(Integer.valueOf(indexOfThisSubtask)).booleanValue()) {
                JMFailoverITCase.tryWaitUntilCondition(() -> {
                    return Boolean.valueOf(!subtaskBlocked.get(Integer.valueOf(indexOfThisSubtask)).booleanValue());
                });
            }
        }

        public void processElement(StreamRecord<Long> streamRecord) throws Exception {
            this.output.collect(new StreamRecord(new Tuple2(Integer.valueOf(((Long) streamRecord.getValue()).intValue() % JMFailoverITCase.NUMBER_KEYS), 1)));
        }

        public static void clear() {
            subtaskBlocked.clear();
            attemptIds.clear();
        }

        public static void blockSubTasks(Integer... numArr) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(numArr), true, subtaskBlocked);
        }

        public static void unblockSubTasks(Integer... numArr) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(numArr), false, subtaskBlocked);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/scheduling/JMFailoverITCase$StubMapFunction2.class */
    public static class StubMapFunction2 extends AbstractStreamOperator<Tuple2<Integer, Integer>> implements OneInputStreamOperator<String, Tuple2<Integer, Integer>> {
        public static Map<Integer, Boolean> subtaskBlocked = new ConcurrentHashMap();
        public static Map<Integer, Integer> attemptIds = new ConcurrentHashMap();

        private StubMapFunction2() {
        }

        public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Tuple2<Integer, Integer>>> output) {
            super.setup(streamTask, streamConfig, output);
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            attemptIds.compute(Integer.valueOf(indexOfThisSubtask), (num, num2) -> {
                return num2 == null ? 0 : Integer.valueOf(num2.intValue() + 1);
            });
            if (subtaskBlocked.containsKey(Integer.valueOf(indexOfThisSubtask)) && subtaskBlocked.get(Integer.valueOf(indexOfThisSubtask)).booleanValue()) {
                JMFailoverITCase.tryWaitUntilCondition(() -> {
                    return Boolean.valueOf(!subtaskBlocked.get(Integer.valueOf(indexOfThisSubtask)).booleanValue());
                });
            }
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            this.output.collect(new StreamRecord(new Tuple2(Integer.valueOf(Integer.parseInt((String) streamRecord.getValue()) % JMFailoverITCase.NUMBER_KEYS), 1)));
        }

        public static void clear() {
            subtaskBlocked.clear();
            attemptIds.clear();
        }

        public static void blockSubTasks(Integer... numArr) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(numArr), true, subtaskBlocked);
        }

        public static void unblockSubTasks(Integer... numArr) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(numArr), false, subtaskBlocked);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/scheduling/JMFailoverITCase$StubRecordSink.class */
    public static class StubRecordSink extends AbstractStreamOperator<Void> implements OneInputStreamOperator<Tuple2<Integer, Integer>, Void> {
        public static Map<Integer, Boolean> subtaskBlocked = new ConcurrentHashMap();
        public static Map<Integer, Integer> attemptIds = new ConcurrentHashMap();
        public static Map<Integer, Integer> countResults = new ConcurrentHashMap();

        private StubRecordSink() {
        }

        public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Void>> output) {
            super.setup(streamTask, streamConfig, output);
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            attemptIds.compute(Integer.valueOf(indexOfThisSubtask), (num, num2) -> {
                return num2 == null ? 0 : Integer.valueOf(num2.intValue() + 1);
            });
            if (subtaskBlocked.containsKey(Integer.valueOf(indexOfThisSubtask)) && subtaskBlocked.get(Integer.valueOf(indexOfThisSubtask)).booleanValue()) {
                JMFailoverITCase.tryWaitUntilCondition(() -> {
                    return Boolean.valueOf(!subtaskBlocked.get(Integer.valueOf(indexOfThisSubtask)).booleanValue());
                });
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void processElement(StreamRecord<Tuple2<Integer, Integer>> streamRecord) throws Exception {
            Tuple2 tuple2 = (Tuple2) streamRecord.getValue();
            countResults.put(tuple2.f0, tuple2.f1);
        }

        public static void clear() {
            subtaskBlocked.clear();
            attemptIds.clear();
            countResults.clear();
        }

        public static void blockSubTasks(Integer... numArr) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(numArr), true, subtaskBlocked);
        }

        public static void unblockSubTasks(Integer... numArr) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(numArr), false, subtaskBlocked);
        }
    }

    JMFailoverITCase() {
    }

    @BeforeEach
    void before(TestInfo testInfo) throws Exception {
        this.flinkConfiguration = new Configuration();
        SourceTail.clear();
        StubMapFunction.clear();
        StubRecordSink.clear();
        testInfo.getTestMethod().ifPresent(method -> {
            this.methodName = method.getName();
        });
    }

    @AfterEach
    void after() {
        Throwable th = null;
        try {
            if (this.flinkCluster != null) {
                this.flinkCluster.close();
            }
        } catch (Throwable th2) {
            th = th2;
        }
        if (th != null) {
            ExceptionUtils.rethrow(th);
        }
    }

    public void setup() throws Exception {
        SourceTail.clear();
        StubMapFunction.clear();
        StubRecordSink.clear();
    }

    @Test
    void testRecoverFromJMFailover() throws Exception {
        JobGraph prepareEnvAndGetJobGraph = prepareEnvAndGetJobGraph();
        StubRecordSink.blockSubTasks(0, 1, 2, 3);
        JobID jobID = ((JobSubmissionResult) this.flinkCluster.submitJob(prepareEnvAndGetJobGraph).get()).getJobID();
        tryWaitUntilCondition(() -> {
            return Boolean.valueOf(StubRecordSink.attemptIds.size() > 0);
        });
        triggerJMFailover(jobID);
        StubRecordSink.unblockSubTasks(0, 1, 2, 3);
        Assertions.assertThat(((JobResult) this.flinkCluster.requestJobResult(jobID).get()).getSerializedThrowable()).isEmpty();
        checkCountResults();
    }

    @Test
    void testSourceNotAllFinished() throws Exception {
        JobGraph prepareEnvAndGetJobGraph = prepareEnvAndGetJobGraph();
        SourceTail.blockSubTasks(0);
        JobID jobID = ((JobSubmissionResult) this.flinkCluster.submitJob(prepareEnvAndGetJobGraph).get()).getJobID();
        tryWaitUntilCondition(() -> {
            return Boolean.valueOf(SourceTail.attemptIds.size() == SOURCE_PARALLELISM);
        });
        JobVertex jobVertex = (JobVertex) prepareEnvAndGetJobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        while (Arrays.stream(((AccessExecutionGraph) this.flinkCluster.getExecutionGraph(jobID).get()).getJobVertex(jobVertex.getID()).getTaskVertices()).filter(accessExecutionVertex -> {
            return accessExecutionVertex.getExecutionState() == ExecutionState.FINISHED;
        }).count() != 7) {
            Thread.sleep(100L);
        }
        triggerJMFailover(jobID);
        SourceTail.unblockSubTasks(0);
        Assertions.assertThat(((JobResult) this.flinkCluster.requestJobResult(jobID).get()).getSerializedThrowable()).isEmpty();
        checkCountResults();
    }

    @Test
    void testTaskExecutorNotRegisterOnTime() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT, Duration.ZERO);
        JobGraph prepareEnvAndGetJobGraph = prepareEnvAndGetJobGraph(configuration);
        StubRecordSink.blockSubTasks(0, 1, 2, 3);
        JobID jobID = ((JobSubmissionResult) this.flinkCluster.submitJob(prepareEnvAndGetJobGraph).get()).getJobID();
        tryWaitUntilCondition(() -> {
            return Boolean.valueOf(StubRecordSink.attemptIds.size() > 0);
        });
        triggerJMFailover(jobID);
        StubRecordSink.unblockSubTasks(0, 1, 2, 3);
        Assertions.assertThat(((JobResult) this.flinkCluster.requestJobResult(jobID).get()).getSerializedThrowable()).isEmpty();
        checkCountResults();
    }

    @Test
    void testPartitionNotFoundTwice() throws Exception {
        JobGraph prepareEnvAndGetJobGraph = prepareEnvAndGetJobGraph();
        StubMapFunction.blockSubTasks(0, 1);
        JobID jobID = ((JobSubmissionResult) this.flinkCluster.submitJob(prepareEnvAndGetJobGraph).get()).getJobID();
        tryWaitUntilCondition(() -> {
            return Boolean.valueOf(StubMapFunction.attemptIds.size() > 0);
        });
        triggerJMFailover(jobID);
        releaseResultPartitionOfSource();
        StubMapFunction.unblockSubTasks(0);
        tryWaitUntilCondition(() -> {
            return Boolean.valueOf(StubMapFunction.attemptIds.get(0).intValue() == 1);
        });
        releaseResultPartitionOfSource();
        StubMapFunction.unblockSubTasks(1);
        Assertions.assertThat(((JobResult) this.flinkCluster.requestJobResult(jobID).get()).getSerializedThrowable()).isEmpty();
        checkCountResults();
    }

    @Test
    void testPartitionNotFoundAndOperatorCoordinatorNotSupportBatchSnapshot() throws Exception {
        JobGraph prepareEnvAndGetJobGraph = prepareEnvAndGetJobGraph(false);
        StubMapFunction2.blockSubTasks(0, 1, 2, 3);
        JobID jobID = ((JobSubmissionResult) this.flinkCluster.submitJob(prepareEnvAndGetJobGraph).get()).getJobID();
        tryWaitUntilCondition(() -> {
            return Boolean.valueOf(StubMapFunction2.attemptIds.size() > 0);
        });
        triggerJMFailover(jobID);
        releaseResultPartitionOfSource();
        StubMapFunction2.unblockSubTasks(0, 1, 2, 3);
        Assertions.assertThat(((JobResult) this.flinkCluster.requestJobResult(jobID).get()).getSerializedThrowable()).isEmpty();
        checkCountResults();
    }

    @Test
    void testPartitionNotFoundAndOperatorCoordinatorSupportBatchSnapshot() throws Exception {
        JobGraph prepareEnvAndGetJobGraph = prepareEnvAndGetJobGraph();
        StubMapFunction.blockSubTasks(0);
        JobID jobID = ((JobSubmissionResult) this.flinkCluster.submitJob(prepareEnvAndGetJobGraph).get()).getJobID();
        tryWaitUntilCondition(() -> {
            return Boolean.valueOf(StubMapFunction.attemptIds.size() > 0);
        });
        triggerJMFailover(jobID);
        releaseResultPartitionOfSource();
        StubMapFunction.unblockSubTasks(0);
        Assertions.assertThat(((JobResult) this.flinkCluster.requestJobResult(jobID).get()).getSerializedThrowable()).isEmpty();
        checkCountResults();
    }

    private JobGraph prepareEnvAndGetJobGraph() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT, this.previousWorkerRecoveryTimeout);
        return prepareEnvAndGetJobGraph(configuration, true);
    }

    private JobGraph prepareEnvAndGetJobGraph(Configuration configuration) throws Exception {
        return prepareEnvAndGetJobGraph(configuration, true);
    }

    private JobGraph prepareEnvAndGetJobGraph(boolean z) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT, this.previousWorkerRecoveryTimeout);
        return prepareEnvAndGetJobGraph(configuration, z);
    }

    private JobGraph prepareEnvAndGetJobGraph(Configuration configuration, boolean z) throws Exception {
        this.flinkCluster = TestingMiniCluster.newBuilder(getMiniClusterConfiguration(configuration)).setHighAvailabilityServicesSupplier(this.highAvailabilityServicesSupplier).build();
        this.flinkCluster.start();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(-1);
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        return z ? createJobGraph(executionEnvironment, this.methodName) : createJobGraphWithUnsupportedBatchSnapshotOperatorCoordinator(executionEnvironment, this.methodName);
    }

    private TestingMiniClusterConfiguration getMiniClusterConfiguration(Configuration configuration) throws IOException {
        NetUtils.Port availablePort = NetUtils.getAvailablePort();
        this.flinkConfiguration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
        this.flinkConfiguration.set(JobManagerOptions.PORT, Integer.valueOf(availablePort.getPort()));
        this.flinkConfiguration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, Duration.ofMillis(5000L));
        this.flinkConfiguration.set(RestOptions.BIND_PORT, "0");
        this.flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
        this.flinkConfiguration.set(TaskManagerOptions.NETWORK_MEMORY_FRACTION, Float.valueOf(0.4f));
        this.flinkConfiguration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.AdaptiveBatch);
        this.flinkConfiguration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, 4);
        this.flinkConfiguration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK, MemorySize.parse("256K"));
        this.flinkConfiguration.set(BatchExecutionOptions.JOB_RECOVERY_ENABLED, true);
        this.flinkConfiguration.set(BatchExecutionOptions.JOB_RECOVERY_SNAPSHOT_MIN_PAUSE, Duration.ZERO);
        this.flinkConfiguration.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
        this.flinkConfiguration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
        this.flinkConfiguration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 10);
        this.flinkConfiguration.set(HighAvailabilityOptions.HA_STORAGE_PATH, this.temporaryFolder.toString());
        this.highAvailabilityServices = new EmbeddedHaServicesWithLeadershipControl(EXECUTOR_EXTENSION.getExecutor());
        this.highAvailabilityServicesSupplier = () -> {
            return this.highAvailabilityServices;
        };
        this.flinkConfiguration.set(CoreOptions.TMP_DIRS, this.temporaryFolder.toString());
        this.flinkConfiguration.addAll(configuration);
        return TestingMiniClusterConfiguration.newBuilder().setConfiguration(this.flinkConfiguration).setNumTaskManagers(this.numTaskManagers).setNumSlotsPerTaskManager(this.numSlotsPerTaskManager).build();
    }

    private void triggerJMFailover(JobID jobID) throws Exception {
        this.highAvailabilityServices.revokeJobMasterLeadership(jobID).get();
        this.highAvailabilityServices.grantJobMasterLeadership(jobID);
    }

    private static void checkCountResults() {
        Map<Integer, Integer> map = StubRecordSink.countResults;
        Assertions.assertThat(map.size()).isEqualTo(NUMBER_KEYS);
        Assertions.assertThat(map).isEqualTo((Map) IntStream.range(0, NUMBER_KEYS).boxed().collect(Collectors.toMap(Function.identity(), num -> {
            return 4;
        })));
    }

    private void releaseResultPartitionOfSource() {
        deleteOldestFileInShuffleNettyDirectory(new File((String) this.flinkConfiguration.get(CoreOptions.TMP_DIRS)));
    }

    private JobGraph createJobGraph(StreamExecutionEnvironment streamExecutionEnvironment, String str) {
        streamExecutionEnvironment.fromSequence(0L, 39999L).setParallelism(SOURCE_PARALLELISM).slotSharingGroup("group1").transform("SourceTail", TypeInformation.of(Long.class), new SourceTail()).setParallelism(SOURCE_PARALLELISM).slotSharingGroup("group1").transform("Map", new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}), new StubMapFunction()).slotSharingGroup("group2").keyBy(tuple2 -> {
            return (Integer) tuple2.f0;
        }).sum(1).slotSharingGroup("group3").transform("Sink", TypeInformation.of(Void.class), new StubRecordSink()).slotSharingGroup("group4");
        StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph();
        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
        streamGraph.setJobType(JobType.BATCH);
        streamGraph.setJobName(str);
        return StreamingJobGraphGenerator.createJobGraph(streamGraph);
    }

    private JobGraph createJobGraphWithUnsupportedBatchSnapshotOperatorCoordinator(StreamExecutionEnvironment streamExecutionEnvironment, String str) throws Exception {
        TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO});
        File file = new File(this.temporaryFolder.getParent().toFile(), "data.tmp-" + UUID.randomUUID());
        prepareTestData(file);
        streamExecutionEnvironment.fromSource(FileSource.forRecordStreamFormat(new TextLineInputFormat(), new org.apache.flink.core.fs.Path[]{new org.apache.flink.core.fs.Path(file.getPath())}).build(), WatermarkStrategy.noWatermarks(), "source").setParallelism(SOURCE_PARALLELISM).slotSharingGroup("group1").transform("Map", tupleTypeInfo, new StubMapFunction2()).slotSharingGroup("group2").keyBy(tuple2 -> {
            return (Integer) tuple2.f0;
        }).sum(1).slotSharingGroup("group3").transform("Sink", TypeInformation.of(Void.class), new StubRecordSink()).slotSharingGroup("group4");
        StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph();
        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
        streamGraph.setJobType(JobType.BATCH);
        streamGraph.setJobName(str);
        return StreamingJobGraphGenerator.createJobGraph(streamGraph);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void setSubtaskBlocked(List<Integer> list, boolean z, Map<Integer, Boolean> map) {
        list.forEach(num -> {
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void tryWaitUntilCondition(SupplierWithException<Boolean, Exception> supplierWithException) {
        try {
            CommonTestUtils.waitUntilCondition(supplierWithException);
        } catch (Exception e) {
        }
    }

    private File prepareTestData(File file) throws IOException {
        FileWriter fileWriter = new FileWriter(file);
        Throwable th = null;
        for (int i = 0; i < 40000; i++) {
            try {
                try {
                    fileWriter.write(i + "\n");
                } finally {
                }
            } catch (Throwable th2) {
                if (fileWriter != null) {
                    if (th != null) {
                        try {
                            fileWriter.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        fileWriter.close();
                    }
                }
                throw th2;
            }
        }
        if (fileWriter != null) {
            if (0 != 0) {
                try {
                    fileWriter.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                fileWriter.close();
            }
        }
        return file;
    }

    private void deleteOldestFileInShuffleNettyDirectory(File file) {
        File[] listFiles;
        if (file == null || !file.exists() || !file.isDirectory() || (listFiles = file.listFiles(file2 -> {
            return file2.isDirectory() && file2.getName().startsWith("flink-netty-shuffle");
        })) == null) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (File file3 : listFiles) {
            Stream filter = Arrays.stream(file3.listFiles()).filter(file4 -> {
                return file4.getName().endsWith(".shuffle.data");
            });
            arrayList.getClass();
            filter.forEach((v1) -> {
                r1.add(v1);
            });
        }
        if (arrayList.isEmpty()) {
            return;
        }
        arrayList.sort(Comparator.comparing(this::getFileCreationTime));
        ((File) arrayList.get(0)).delete();
    }

    private long getFileCreationTime(File file) {
        try {
            return Files.readAttributes(file.toPath(), BasicFileAttributes.class, new LinkOption[0]).creationTime().toMillis();
        } catch (NoSuchFileException e) {
            return Long.MAX_VALUE;
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -591763014:
                if (implMethodName.equals("lambda$createJobGraphWithUnsupportedBatchSnapshotOperatorCoordinator$26e40229$1")) {
                    z = false;
                    break;
                }
                break;
            case 1642228547:
                if (implMethodName.equals("lambda$createJobGraph$26e40229$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/scheduling/JMFailoverITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple2 -> {
                        return (Integer) tuple2.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/scheduling/JMFailoverITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple22 -> {
                        return (Integer) tuple22.f0;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
