/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.test.recovery.AbstractTaskManagerProcessFailureRecoveryTest;
import org.apache.flink.test.recovery.utils.TaskExecutorProcessEntryPoint;
import org.apache.flink.test.util.TestProcessBuilder;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.Executors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={TestLoggerExtension.class})
class LocalRecoveryITCase {
    private static final String ALLOCATION_FAILURES_ACCUMULATOR_NAME = "acc";
    @TempDir
    private File tmpDirectory;

    LocalRecoveryITCase() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRecoverLocallyFromProcessCrashWithWorkingDirectory() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.ADDRESS, (Object)"localhost");
        configuration.set(JobManagerOptions.PORT, (Object)0);
        configuration.set(RestOptions.BIND_PORT, (Object)"0");
        configuration.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, (Object)Duration.ofMillis(10000L));
        configuration.set(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, (Object)Duration.ofMillis(1000L));
        configuration.set(HeartbeatManagerOptions.HEARTBEAT_RPC_FAILURE_THRESHOLD, (Object)1);
        configuration.set(ClusterOptions.PROCESS_WORKING_DIR_BASE, (Object)this.tmpDirectory.getAbsolutePath());
        configuration.set(StateRecoveryOptions.LOCAL_RECOVERY, (Object)true);
        configuration.set(TaskManagerOptions.SLOT_TIMEOUT, (Object)Duration.ofSeconds(30L));
        int parallelism = 3;
        boolean success = false;
        List<TaskManagerProcess> taskManagerProcesses = Collections.emptyList();
        try (StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(configuration);){
            clusterEntrypoint.startCluster();
            Configuration configurationTemplate = new Configuration(configuration);
            configurationTemplate.set(JobManagerOptions.PORT, (Object)clusterEntrypoint.getRpcPort());
            taskManagerProcesses = LocalRecoveryITCase.startTaskManagerProcesses(3, configurationTemplate);
            JobClient jobClient = this.submitJob(3, clusterEntrypoint);
            long waitingTimeInSeconds = 45L;
            this.waitUntilCheckpointCompleted(configuration, clusterEntrypoint.getRestPort(), jobClient.getJobID());
            LocalRecoveryITCase.restartTaskManagerProcesses(taskManagerProcesses, 2);
            List allocFailures = (List)((JobExecutionResult)jobClient.getJobExecutionResult().get(45L, TimeUnit.SECONDS)).getAccumulatorResult(ALLOCATION_FAILURES_ACCUMULATOR_NAME);
            Assertions.assertTrue((boolean)allocFailures.isEmpty(), (String)allocFailures.toString());
            success = true;
        }
        finally {
            if (!success) {
                for (TaskManagerProcess taskManagerProcess : taskManagerProcesses) {
                    LocalRecoveryITCase.printLogOutput(taskManagerProcess);
                }
            }
            for (TaskManagerProcess taskManagerProcess : taskManagerProcesses) {
                taskManagerProcess.terminate();
            }
        }
    }

    private static void printLogOutput(TaskManagerProcess taskManagerProcess) {
        for (TestProcessBuilder.TestProcess testProcess : taskManagerProcess.getProcessHistory()) {
            AbstractTaskManagerProcessFailureRecoveryTest.printProcessLog(taskManagerProcess.getName(), testProcess);
        }
    }

    private static void restartTaskManagerProcesses(Collection<TaskManagerProcess> taskManagerProcesses, int numberRestarts) throws IOException, InterruptedException {
        Preconditions.checkArgument((numberRestarts <= taskManagerProcesses.size() ? 1 : 0) != 0);
        Iterator<TaskManagerProcess> iterator = taskManagerProcesses.iterator();
        for (int i = 0; i < numberRestarts; ++i) {
            iterator.next().restartProcess(LocalRecoveryITCase.createTaskManagerProcessBuilder());
        }
    }

    private static Collection<TaskManagerProcess> startTaskManagerProcesses(int numberTaskManagers, Configuration configurationTemplate) throws IOException {
        ArrayList<TaskManagerProcess> result = new ArrayList<TaskManagerProcess>();
        for (int i = 0; i < numberTaskManagers; ++i) {
            Configuration effectiveConfiguration = new Configuration(configurationTemplate);
            effectiveConfiguration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, (Object)("taskManager_" + i));
            TestProcessBuilder.TestProcess process = LocalRecoveryITCase.startTaskManagerProcess(effectiveConfiguration);
            result.add(new TaskManagerProcess(effectiveConfiguration, process));
        }
        return result;
    }

    private static TestProcessBuilder.TestProcess startTaskManagerProcess(Configuration effectiveConfiguration) throws IOException {
        TestProcessBuilder taskManagerProcessBuilder = LocalRecoveryITCase.createTaskManagerProcessBuilder();
        taskManagerProcessBuilder.addConfigAsMainClassArgs(effectiveConfiguration);
        TestProcessBuilder.TestProcess process = taskManagerProcessBuilder.start();
        return process;
    }

    @Nonnull
    private static TestProcessBuilder createTaskManagerProcessBuilder() throws IOException {
        return new TestProcessBuilder(TaskExecutorProcessEntryPoint.class.getName());
    }

    private void waitUntilCheckpointCompleted(Configuration configuration, int restPort, JobID jobId) throws Exception {
        RestClient restClient = new RestClient(configuration, Executors.directExecutor());
        JobMessageParameters messageParameters = new JobMessageParameters();
        messageParameters.jobPathParameter.resolve((Object)jobId);
        CommonTestUtils.waitUntilCondition(() -> {
            CheckpointingStatistics checkpointingStatistics = (CheckpointingStatistics)restClient.sendRequest("localhost", restPort, (MessageHeaders)CheckpointingStatisticsHeaders.getInstance(), (MessageParameters)messageParameters, (RequestBody)EmptyRequestBody.getInstance()).join();
            return checkpointingStatistics.getCounts().getNumberCompletedCheckpoints() > 0L;
        });
    }

    private JobClient submitJob(int parallelism, StandaloneSessionClusterEntrypoint clusterEntrypoint) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)clusterEntrypoint.getRestPort(), (Configuration)new Configuration(), (String[])new String[0]);
        env.setParallelism(parallelism);
        env.enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        env.addSource((SourceFunction)new LocalRecoverySource()).keyBy((KeySelector & Serializable)x -> x).sinkTo((Sink)new DiscardingSink());
        JobClient jobClient = env.executeAsync();
        return jobClient;
    }

    private static class TaskNameAllocationID {
        private final String name;
        private final String allocationId;

        private TaskNameAllocationID(String name, String allocationId) {
            this.name = name;
            this.allocationId = allocationId;
        }

        public String getName() {
            return this.name;
        }

        public String getAllocationId() {
            return this.allocationId;
        }
    }

    private static class LocalRecoverySource
    extends RichParallelSourceFunction<Integer>
    implements CheckpointedFunction {
        private volatile boolean running = true;
        private transient ListState<TaskNameAllocationID> previousAllocations;

        private LocalRecoverySource() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            while (this.running) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)1);
                }
                Thread.sleep(5L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext)this.getRuntimeContext();
            String allocationId = runtimeContext.getAllocationIDAsString();
            String myName = runtimeContext.getTaskInfo().getTaskNameWithSubtasks().split("#")[0];
            ListStateDescriptor previousAllocationsStateDescriptor = new ListStateDescriptor("sourceState", TaskNameAllocationID.class);
            this.previousAllocations = context.getOperatorStateStore().getUnionListState(previousAllocationsStateDescriptor);
            if (context.isRestored()) {
                Iterable taskNameAllocationIds = (Iterable)this.previousAllocations.get();
                Optional<Object> optionalMyTaskNameAllocationId = Optional.empty();
                for (TaskNameAllocationID taskNameAllocationId : taskNameAllocationIds) {
                    if (!taskNameAllocationId.getName().equals(myName)) continue;
                    optionalMyTaskNameAllocationId = Optional.of(taskNameAllocationId);
                    break;
                }
                TaskNameAllocationID myTaskNameAllocationId = (TaskNameAllocationID)optionalMyTaskNameAllocationId.orElseThrow(() -> new IllegalStateException("Could not find corresponding TaskNameAllocationID information."));
                runtimeContext.addAccumulator(LocalRecoveryITCase.ALLOCATION_FAILURES_ACCUMULATOR_NAME, (Accumulator)new ListAccumulator());
                if (!allocationId.equals(myTaskNameAllocationId.getAllocationId())) {
                    runtimeContext.getAccumulator(LocalRecoveryITCase.ALLOCATION_FAILURES_ACCUMULATOR_NAME).add((Object)String.format("The task was deployed to AllocationID(%s) but it should have been deployed to AllocationID(%s) for local recovery.", allocationId, myTaskNameAllocationId.getAllocationId()));
                }
                this.running = false;
            }
            this.previousAllocations.update(Collections.singletonList(new TaskNameAllocationID(myName, allocationId)));
        }
    }

    private static class TaskManagerProcess {
        private final Configuration configuration;
        private final List<TestProcessBuilder.TestProcess> processHistory;

        private TaskManagerProcess(Configuration configuration, TestProcessBuilder.TestProcess process) {
            this.configuration = configuration;
            this.processHistory = new ArrayList<TestProcessBuilder.TestProcess>();
            this.processHistory.add(process);
        }

        Iterable<TestProcessBuilder.TestProcess> getProcessHistory() {
            return this.processHistory;
        }

        void restartProcess(TestProcessBuilder builder) throws IOException, InterruptedException {
            TestProcessBuilder.TestProcess runningProcess = this.getRunningProcess();
            runningProcess.destroy();
            runningProcess.getProcess().waitFor();
            builder.addConfigAsMainClassArgs(this.configuration);
            TestProcessBuilder.TestProcess restartedProcess = builder.start();
            this.processHistory.add(restartedProcess);
        }

        private TestProcessBuilder.TestProcess getRunningProcess() {
            return this.processHistory.get(this.processHistory.size() - 1);
        }

        public String getName() {
            return (String)this.configuration.get(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID);
        }

        public void terminate() {
            this.getRunningProcess().destroy();
        }
    }
}

