/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.scheduling;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.NoSuchFileException;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

class JMFailoverITCase {
    private final Duration previousWorkerRecoveryTimeout = Duration.ofSeconds(3L);
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    private static final int DEFAULT_MAX_PARALLELISM = 4;
    private static final int SOURCE_PARALLELISM = 8;
    private static final int NUMBER_KEYS = 10000;
    private static final int NUMBER_OF_EACH_KEY = 4;
    private EmbeddedHaServicesWithLeadershipControl highAvailabilityServices;
    private String methodName;
    @TempDir
    java.nio.file.Path temporaryFolder;
    protected int numTaskManagers = 4;
    protected int numSlotsPerTaskManager = 4;
    protected Configuration flinkConfiguration = new Configuration();
    protected MiniCluster flinkCluster;
    protected Supplier<HighAvailabilityServices> highAvailabilityServicesSupplier = null;

    JMFailoverITCase() {
    }

    @BeforeEach
    void before(TestInfo testInfo) throws Exception {
        this.flinkConfiguration = new Configuration();
        SourceTail.clear();
        StubMapFunction.clear();
        StubRecordSink.clear();
        testInfo.getTestMethod().ifPresent(method -> {
            this.methodName = method.getName();
        });
    }

    @AfterEach
    void after() {
        Throwable exception = null;
        try {
            if (this.flinkCluster != null) {
                this.flinkCluster.close();
            }
        }
        catch (Throwable throwable) {
            exception = throwable;
        }
        if (exception != null) {
            ExceptionUtils.rethrow((Throwable)exception);
        }
    }

    public void setup() throws Exception {
        SourceTail.clear();
        StubMapFunction.clear();
        StubRecordSink.clear();
    }

    @Test
    void testRecoverFromJMFailover() throws Exception {
        JobGraph jobGraph = this.prepareEnvAndGetJobGraph();
        StubRecordSink.blockSubTasks(0, 1, 2, 3);
        JobID jobId = ((JobSubmissionResult)this.flinkCluster.submitJob(jobGraph).get()).getJobID();
        JMFailoverITCase.tryWaitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> StubRecordSink.attemptIds.size() > 0));
        this.triggerJMFailover(jobId);
        StubRecordSink.unblockSubTasks(0, 1, 2, 3);
        JobResult jobResult = (JobResult)this.flinkCluster.requestJobResult(jobId).get();
        Assertions.assertThat((Optional)jobResult.getSerializedThrowable()).isEmpty();
        JMFailoverITCase.checkCountResults();
    }

    @Test
    void testSourceNotAllFinished() throws Exception {
        AccessExecutionGraph executionGraph;
        long finishedTasks;
        JobGraph jobGraph = this.prepareEnvAndGetJobGraph();
        SourceTail.blockSubTasks(0);
        JobID jobId = ((JobSubmissionResult)this.flinkCluster.submitJob(jobGraph).get()).getJobID();
        JMFailoverITCase.tryWaitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> SourceTail.attemptIds.size() == 8));
        JobVertex source = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        while ((finishedTasks = Arrays.stream((executionGraph = (AccessExecutionGraph)this.flinkCluster.getExecutionGraph(jobId).get()).getJobVertex(source.getID()).getTaskVertices()).filter(task -> task.getExecutionState() == ExecutionState.FINISHED).count()) != 7L) {
            Thread.sleep(100L);
        }
        this.triggerJMFailover(jobId);
        SourceTail.unblockSubTasks(0);
        JobResult jobResult = (JobResult)this.flinkCluster.requestJobResult(jobId).get();
        Assertions.assertThat((Optional)jobResult.getSerializedThrowable()).isEmpty();
        JMFailoverITCase.checkCountResults();
    }

    @Test
    void testTaskExecutorNotRegisterOnTime() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT, (Object)Duration.ZERO);
        JobGraph jobGraph = this.prepareEnvAndGetJobGraph(configuration);
        StubRecordSink.blockSubTasks(0, 1, 2, 3);
        JobID jobId = ((JobSubmissionResult)this.flinkCluster.submitJob(jobGraph).get()).getJobID();
        JMFailoverITCase.tryWaitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> StubRecordSink.attemptIds.size() > 0));
        this.triggerJMFailover(jobId);
        StubRecordSink.unblockSubTasks(0, 1, 2, 3);
        JobResult jobResult = (JobResult)this.flinkCluster.requestJobResult(jobId).get();
        Assertions.assertThat((Optional)jobResult.getSerializedThrowable()).isEmpty();
        JMFailoverITCase.checkCountResults();
    }

    @Test
    void testPartitionNotFoundTwice() throws Exception {
        JobGraph jobGraph = this.prepareEnvAndGetJobGraph();
        StubMapFunction.blockSubTasks(0, 1);
        JobID jobId = ((JobSubmissionResult)this.flinkCluster.submitJob(jobGraph).get()).getJobID();
        JMFailoverITCase.tryWaitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> StubMapFunction.attemptIds.size() > 0));
        this.triggerJMFailover(jobId);
        this.releaseResultPartitionOfSource();
        StubMapFunction.unblockSubTasks(0);
        JMFailoverITCase.tryWaitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> StubMapFunction.attemptIds.get(0) == 1));
        this.releaseResultPartitionOfSource();
        StubMapFunction.unblockSubTasks(1);
        JobResult jobResult = (JobResult)this.flinkCluster.requestJobResult(jobId).get();
        Assertions.assertThat((Optional)jobResult.getSerializedThrowable()).isEmpty();
        JMFailoverITCase.checkCountResults();
    }

    @Test
    void testPartitionNotFoundAndOperatorCoordinatorNotSupportBatchSnapshot() throws Exception {
        JobGraph jobGraph = this.prepareEnvAndGetJobGraph(false);
        StubMapFunction2.blockSubTasks(0, 1, 2, 3);
        JobID jobId = ((JobSubmissionResult)this.flinkCluster.submitJob(jobGraph).get()).getJobID();
        JMFailoverITCase.tryWaitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> StubMapFunction2.attemptIds.size() > 0));
        this.triggerJMFailover(jobId);
        this.releaseResultPartitionOfSource();
        StubMapFunction2.unblockSubTasks(0, 1, 2, 3);
        JobResult jobResult = (JobResult)this.flinkCluster.requestJobResult(jobId).get();
        Assertions.assertThat((Optional)jobResult.getSerializedThrowable()).isEmpty();
        JMFailoverITCase.checkCountResults();
    }

    @Test
    void testPartitionNotFoundAndOperatorCoordinatorSupportBatchSnapshot() throws Exception {
        JobGraph jobGraph = this.prepareEnvAndGetJobGraph();
        StubMapFunction.blockSubTasks(0);
        JobID jobId = ((JobSubmissionResult)this.flinkCluster.submitJob(jobGraph).get()).getJobID();
        JMFailoverITCase.tryWaitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> StubMapFunction.attemptIds.size() > 0));
        this.triggerJMFailover(jobId);
        this.releaseResultPartitionOfSource();
        StubMapFunction.unblockSubTasks(0);
        JobResult jobResult = (JobResult)this.flinkCluster.requestJobResult(jobId).get();
        Assertions.assertThat((Optional)jobResult.getSerializedThrowable()).isEmpty();
        JMFailoverITCase.checkCountResults();
    }

    private JobGraph prepareEnvAndGetJobGraph() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT, (Object)this.previousWorkerRecoveryTimeout);
        return this.prepareEnvAndGetJobGraph(configuration, true);
    }

    private JobGraph prepareEnvAndGetJobGraph(Configuration config) throws Exception {
        return this.prepareEnvAndGetJobGraph(config, true);
    }

    private JobGraph prepareEnvAndGetJobGraph(boolean operatorCoordinatorsSupportsBatchSnapshot) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT, (Object)this.previousWorkerRecoveryTimeout);
        return this.prepareEnvAndGetJobGraph(configuration, operatorCoordinatorsSupportsBatchSnapshot);
    }

    private JobGraph prepareEnvAndGetJobGraph(Configuration config, boolean operatorCoordinatorsSupportsBatchSnapshot) throws Exception {
        this.flinkCluster = TestingMiniCluster.newBuilder((TestingMiniClusterConfiguration)this.getMiniClusterConfiguration(config)).setHighAvailabilityServicesSupplier(this.highAvailabilityServicesSupplier).build();
        this.flinkCluster.start();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(-1);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        return operatorCoordinatorsSupportsBatchSnapshot ? this.createJobGraph(env, this.methodName) : this.createJobGraphWithUnsupportedBatchSnapshotOperatorCoordinator(env, this.methodName);
    }

    private TestingMiniClusterConfiguration getMiniClusterConfiguration(Configuration config) throws IOException {
        NetUtils.Port jobManagerRpcPort = NetUtils.getAvailablePort();
        this.flinkConfiguration.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        this.flinkConfiguration.set(JobManagerOptions.PORT, (Object)jobManagerRpcPort.getPort());
        this.flinkConfiguration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, (Object)Duration.ofMillis(5000L));
        this.flinkConfiguration.set(RestOptions.BIND_PORT, (Object)"0");
        this.flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, (Object)MemorySize.parse((String)"1g"));
        this.flinkConfiguration.set(TaskManagerOptions.NETWORK_MEMORY_FRACTION, (Object)Float.valueOf(0.4f));
        this.flinkConfiguration.set(JobManagerOptions.SCHEDULER, (Object)JobManagerOptions.SchedulerType.AdaptiveBatch);
        this.flinkConfiguration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, (Object)4);
        this.flinkConfiguration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK, (Object)MemorySize.parse((String)"256K"));
        this.flinkConfiguration.set(BatchExecutionOptions.JOB_RECOVERY_ENABLED, (Object)true);
        this.flinkConfiguration.set(BatchExecutionOptions.JOB_RECOVERY_SNAPSHOT_MIN_PAUSE, (Object)Duration.ZERO);
        this.flinkConfiguration.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, (Object)"region");
        this.flinkConfiguration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"fixed-delay");
        this.flinkConfiguration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, (Object)10);
        this.flinkConfiguration.set(HighAvailabilityOptions.HA_STORAGE_PATH, (Object)this.temporaryFolder.toString());
        this.highAvailabilityServices = new EmbeddedHaServicesWithLeadershipControl((Executor)EXECUTOR_EXTENSION.getExecutor());
        this.highAvailabilityServicesSupplier = () -> this.highAvailabilityServices;
        this.flinkConfiguration.set(CoreOptions.TMP_DIRS, (Object)this.temporaryFolder.toString());
        this.flinkConfiguration.addAll(config);
        return TestingMiniClusterConfiguration.newBuilder().setConfiguration(this.flinkConfiguration).setNumTaskManagers(this.numTaskManagers).setNumSlotsPerTaskManager(this.numSlotsPerTaskManager).build();
    }

    private void triggerJMFailover(JobID jobId) throws Exception {
        this.highAvailabilityServices.revokeJobMasterLeadership(jobId).get();
        this.highAvailabilityServices.grantJobMasterLeadership(jobId);
    }

    private static void checkCountResults() {
        Map<Integer, Integer> countResults = StubRecordSink.countResults;
        Assertions.assertThat((int)countResults.size()).isEqualTo(10000);
        Map expectedResult = IntStream.range(0, 10000).boxed().collect(Collectors.toMap(Function.identity(), i -> 4));
        Assertions.assertThat(countResults).isEqualTo(expectedResult);
    }

    private void releaseResultPartitionOfSource() {
        this.deleteOldestFileInShuffleNettyDirectory(new File((String)this.flinkConfiguration.get(CoreOptions.TMP_DIRS)));
    }

    private JobGraph createJobGraph(StreamExecutionEnvironment env, String jobName) {
        TupleTypeInfo typeInfo = new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO});
        env.fromSequence(0L, 39999L).setParallelism(8).slotSharingGroup("group1").transform("SourceTail", TypeInformation.of(Long.class), (OneInputStreamOperator)new SourceTail()).setParallelism(8).slotSharingGroup("group1").transform("Map", (TypeInformation)typeInfo, (OneInputStreamOperator)new StubMapFunction()).slotSharingGroup("group2").keyBy((KeySelector & Serializable)tuple2 -> (Integer)tuple2.f0).sum(1).slotSharingGroup("group3").transform("Sink", TypeInformation.of(Void.class), (OneInputStreamOperator)new StubRecordSink()).slotSharingGroup("group4");
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
        streamGraph.setJobType(JobType.BATCH);
        streamGraph.setJobName(jobName);
        return StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
    }

    private JobGraph createJobGraphWithUnsupportedBatchSnapshotOperatorCoordinator(StreamExecutionEnvironment env, String jobName) throws Exception {
        TupleTypeInfo typeInfo = new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO});
        File file = new File(this.temporaryFolder.getParent().toFile(), "data.tmp-" + UUID.randomUUID());
        this.prepareTestData(file);
        FileSource source = FileSource.forRecordStreamFormat((StreamFormat)new TextLineInputFormat(), (Path[])new Path[]{new Path(file.getPath())}).build();
        env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "source").setParallelism(8).slotSharingGroup("group1").transform("Map", (TypeInformation)typeInfo, (OneInputStreamOperator)new StubMapFunction2()).slotSharingGroup("group2").keyBy((KeySelector & Serializable)tuple2 -> (Integer)tuple2.f0).sum(1).slotSharingGroup("group3").transform("Sink", TypeInformation.of(Void.class), (OneInputStreamOperator)new StubRecordSink()).slotSharingGroup("group4");
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
        streamGraph.setJobType(JobType.BATCH);
        streamGraph.setJobName(jobName);
        return StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
    }

    private static void setSubtaskBlocked(List<Integer> indices, boolean block, Map<Integer, Boolean> subtaskBlocked) {
        indices.forEach(index -> subtaskBlocked.put((Integer)index, block));
    }

    private static void tryWaitUntilCondition(SupplierWithException<Boolean, Exception> condition) {
        try {
            CommonTestUtils.waitUntilCondition(condition);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private File prepareTestData(File datafile) throws IOException {
        try (FileWriter writer = new FileWriter(datafile);){
            for (int i = 0; i < 40000; ++i) {
                writer.write(i + "\n");
            }
        }
        return datafile;
    }

    private void deleteOldestFileInShuffleNettyDirectory(File directory) {
        if (directory == null || !directory.exists() || !directory.isDirectory()) {
            return;
        }
        File[] matchingDirectories = directory.listFiles(file -> file.isDirectory() && file.getName().startsWith("flink-netty-shuffle"));
        if (matchingDirectories == null) {
            return;
        }
        ArrayList<File> files = new ArrayList<File>();
        for (File subdirectory : matchingDirectories) {
            Arrays.stream(subdirectory.listFiles()).filter(file -> file.getName().endsWith(".shuffle.data")).forEach(files::add);
        }
        if (!files.isEmpty()) {
            files.sort(Comparator.comparing(this::getFileCreationTime));
            ((File)files.get(0)).delete();
        }
    }

    private long getFileCreationTime(File file) {
        try {
            BasicFileAttributes attrs = Files.readAttributes(file.toPath(), BasicFileAttributes.class, new LinkOption[0]);
            return attrs.creationTime().toMillis();
        }
        catch (NoSuchFileException e) {
            return Long.MAX_VALUE;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static class StubRecordSink
    extends AbstractStreamOperator<Void>
    implements OneInputStreamOperator<Tuple2<Integer, Integer>, Void> {
        public static Map<Integer, Boolean> subtaskBlocked = new ConcurrentHashMap<Integer, Boolean>();
        public static Map<Integer, Integer> attemptIds = new ConcurrentHashMap<Integer, Integer>();
        public static Map<Integer, Integer> countResults = new ConcurrentHashMap<Integer, Integer>();

        private StubRecordSink() {
        }

        protected void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Void>> output) {
            super.setup(containingTask, config, output);
            int subIdx = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            attemptIds.compute(subIdx, (ignored, value) -> {
                value = value == null ? Integer.valueOf(0) : Integer.valueOf(value + 1);
                return value;
            });
            if (subtaskBlocked.containsKey(subIdx) && subtaskBlocked.get(subIdx).booleanValue()) {
                JMFailoverITCase.tryWaitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> subtaskBlocked.get(subIdx) == false));
            }
        }

        public void processElement(StreamRecord<Tuple2<Integer, Integer>> streamRecord) throws Exception {
            Tuple2 value = (Tuple2)streamRecord.getValue();
            countResults.put((Integer)value.f0, (Integer)value.f1);
        }

        public static void clear() {
            subtaskBlocked.clear();
            attemptIds.clear();
            countResults.clear();
        }

        public static void blockSubTasks(Integer ... subIndices) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(subIndices), true, subtaskBlocked);
        }

        public static void unblockSubTasks(Integer ... subIndices) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(subIndices), false, subtaskBlocked);
        }
    }

    private static class StubMapFunction2
    extends AbstractStreamOperator<Tuple2<Integer, Integer>>
    implements OneInputStreamOperator<String, Tuple2<Integer, Integer>> {
        public static Map<Integer, Boolean> subtaskBlocked = new ConcurrentHashMap<Integer, Boolean>();
        public static Map<Integer, Integer> attemptIds = new ConcurrentHashMap<Integer, Integer>();

        private StubMapFunction2() {
        }

        protected void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Tuple2<Integer, Integer>>> output) {
            super.setup(containingTask, config, output);
            int subIdx = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            attemptIds.compute(subIdx, (ignored, value) -> {
                value = value == null ? Integer.valueOf(0) : Integer.valueOf(value + 1);
                return value;
            });
            if (subtaskBlocked.containsKey(subIdx) && subtaskBlocked.get(subIdx).booleanValue()) {
                JMFailoverITCase.tryWaitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> subtaskBlocked.get(subIdx) == false));
            }
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            int number = Integer.parseInt((String)streamRecord.getValue());
            this.output.collect((Object)new StreamRecord((Object)new Tuple2((Object)(number % 10000), (Object)1)));
        }

        public static void clear() {
            subtaskBlocked.clear();
            attemptIds.clear();
        }

        public static void blockSubTasks(Integer ... subIndices) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(subIndices), true, subtaskBlocked);
        }

        public static void unblockSubTasks(Integer ... subIndices) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(subIndices), false, subtaskBlocked);
        }
    }

    private static class StubMapFunction
    extends AbstractStreamOperator<Tuple2<Integer, Integer>>
    implements OneInputStreamOperator<Long, Tuple2<Integer, Integer>> {
        public static Map<Integer, Boolean> subtaskBlocked = new ConcurrentHashMap<Integer, Boolean>();
        public static Map<Integer, Integer> attemptIds = new ConcurrentHashMap<Integer, Integer>();

        private StubMapFunction() {
        }

        protected void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Tuple2<Integer, Integer>>> output) {
            super.setup(containingTask, config, output);
            int subIdx = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            attemptIds.compute(subIdx, (ignored, value) -> {
                value = value == null ? Integer.valueOf(0) : Integer.valueOf(value + 1);
                return value;
            });
            if (subtaskBlocked.containsKey(subIdx) && subtaskBlocked.get(subIdx).booleanValue()) {
                JMFailoverITCase.tryWaitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> subtaskBlocked.get(subIdx) == false));
            }
        }

        public void processElement(StreamRecord<Long> streamRecord) throws Exception {
            int number = ((Long)streamRecord.getValue()).intValue();
            this.output.collect((Object)new StreamRecord((Object)new Tuple2((Object)(number % 10000), (Object)1)));
        }

        public static void clear() {
            subtaskBlocked.clear();
            attemptIds.clear();
        }

        public static void blockSubTasks(Integer ... subIndices) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(subIndices), true, subtaskBlocked);
        }

        public static void unblockSubTasks(Integer ... subIndices) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(subIndices), false, subtaskBlocked);
        }
    }

    private static class SourceTail
    extends AbstractStreamOperator<Long>
    implements OneInputStreamOperator<Long, Long> {
        public static Map<Integer, Boolean> subtaskBlocked = new ConcurrentHashMap<Integer, Boolean>();
        public static Map<Integer, ResultPartitionID> resultPartitions = new ConcurrentHashMap<Integer, ResultPartitionID>();
        public static Map<Integer, Integer> attemptIds = new ConcurrentHashMap<Integer, Integer>();

        protected void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Long>> output) {
            super.setup(containingTask, config, output);
            int subIdx = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            attemptIds.compute(subIdx, (ignored, value) -> {
                value = value == null ? Integer.valueOf(0) : Integer.valueOf(value + 1);
                return value;
            });
            Environment environment = this.getContainingTask().getEnvironment();
            Preconditions.checkState((environment.getAllWriters().length == 1 ? 1 : 0) != 0);
            resultPartitions.put(subIdx, environment.getAllWriters()[0].getPartitionId());
            if (subtaskBlocked.containsKey(subIdx) && subtaskBlocked.get(subIdx).booleanValue()) {
                JMFailoverITCase.tryWaitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> subtaskBlocked.get(subIdx) == false));
            }
        }

        public void processElement(StreamRecord<Long> streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }

        public static void clear() {
            subtaskBlocked.clear();
            attemptIds.clear();
            resultPartitions.clear();
        }

        public static void blockSubTasks(Integer ... subIndices) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(subIndices), true, subtaskBlocked);
        }

        public static void unblockSubTasks(Integer ... subIndices) {
            JMFailoverITCase.setSubtaskBlocked(Arrays.asList(subIndices), false, subtaskBlocked);
        }
    }
}

