package org.apache.flink.test.recovery;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.test.recovery.utils.TaskExecutorProcessEntryPoint;
import org.apache.flink.test.util.TestProcessBuilder;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.Executors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/test/recovery/LocalRecoveryITCase.class */
class LocalRecoveryITCase {
    private static final String ALLOCATION_FAILURES_ACCUMULATOR_NAME = "acc";

    @TempDir
    private File tmpDirectory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/recovery/LocalRecoveryITCase$LocalRecoverySource.class */
    public static class LocalRecoverySource extends RichParallelSourceFunction<Integer> implements CheckpointedFunction {
        private volatile boolean running;
        private transient ListState<TaskNameAllocationID> previousAllocations;

        private LocalRecoverySource() {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            while (this.running) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(1);
                }
                Thread.sleep(5L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            StreamingRuntimeContext runtimeContext = getRuntimeContext();
            String allocationIDAsString = runtimeContext.getAllocationIDAsString();
            String str = runtimeContext.getTaskInfo().getTaskNameWithSubtasks().split("#")[0];
            this.previousAllocations = functionInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("sourceState", TaskNameAllocationID.class));
            if (functionInitializationContext.isRestored()) {
                Iterable iterable = (Iterable) this.previousAllocations.get();
                Optional empty = Optional.empty();
                Iterator it = iterable.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    TaskNameAllocationID taskNameAllocationID = (TaskNameAllocationID) it.next();
                    if (taskNameAllocationID.getName().equals(str)) {
                        empty = Optional.of(taskNameAllocationID);
                        break;
                    }
                }
                TaskNameAllocationID taskNameAllocationID2 = (TaskNameAllocationID) empty.orElseThrow(() -> {
                    return new IllegalStateException("Could not find corresponding TaskNameAllocationID information.");
                });
                runtimeContext.addAccumulator(LocalRecoveryITCase.ALLOCATION_FAILURES_ACCUMULATOR_NAME, new ListAccumulator());
                if (!allocationIDAsString.equals(taskNameAllocationID2.getAllocationId())) {
                    runtimeContext.getAccumulator(LocalRecoveryITCase.ALLOCATION_FAILURES_ACCUMULATOR_NAME).add(String.format("The task was deployed to AllocationID(%s) but it should have been deployed to AllocationID(%s) for local recovery.", allocationIDAsString, taskNameAllocationID2.getAllocationId()));
                }
                this.running = false;
            }
            this.previousAllocations.update(Collections.singletonList(new TaskNameAllocationID(str, allocationIDAsString)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/recovery/LocalRecoveryITCase$TaskManagerProcess.class */
    public static class TaskManagerProcess {
        private final Configuration configuration;
        private final List<TestProcessBuilder.TestProcess> processHistory;

        private TaskManagerProcess(Configuration configuration, TestProcessBuilder.TestProcess testProcess) {
            this.configuration = configuration;
            this.processHistory = new ArrayList();
            this.processHistory.add(testProcess);
        }

        Iterable<TestProcessBuilder.TestProcess> getProcessHistory() {
            return this.processHistory;
        }

        void restartProcess(TestProcessBuilder testProcessBuilder) throws IOException, InterruptedException {
            TestProcessBuilder.TestProcess runningProcess = getRunningProcess();
            runningProcess.destroy();
            runningProcess.getProcess().waitFor();
            testProcessBuilder.addConfigAsMainClassArgs(this.configuration);
            this.processHistory.add(testProcessBuilder.start());
        }

        private TestProcessBuilder.TestProcess getRunningProcess() {
            return this.processHistory.get(this.processHistory.size() - 1);
        }

        public String getName() {
            return (String) this.configuration.get(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID);
        }

        public void terminate() {
            getRunningProcess().destroy();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/recovery/LocalRecoveryITCase$TaskNameAllocationID.class */
    private static class TaskNameAllocationID {
        private final String name;
        private final String allocationId;

        private TaskNameAllocationID(String str, String str2) {
            this.name = str;
            this.allocationId = str2;
        }

        public String getName() {
            return this.name;
        }

        public String getAllocationId() {
            return this.allocationId;
        }
    }

    LocalRecoveryITCase() {
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testRecoverLocallyFromProcessCrashWithWorkingDirectory() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.ADDRESS, "localhost");
        configuration.set(JobManagerOptions.PORT, 0);
        configuration.set(RestOptions.BIND_PORT, "0");
        configuration.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, Duration.ofMillis(10000L));
        configuration.set(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, Duration.ofMillis(1000L));
        configuration.set(HeartbeatManagerOptions.HEARTBEAT_RPC_FAILURE_THRESHOLD, 1);
        configuration.set(ClusterOptions.PROCESS_WORKING_DIR_BASE, this.tmpDirectory.getAbsolutePath());
        configuration.set(StateRecoveryOptions.LOCAL_RECOVERY, true);
        configuration.set(TaskManagerOptions.SLOT_TIMEOUT, Duration.ofSeconds(30L));
        boolean z = false;
        Collection emptyList = Collections.emptyList();
        try {
            StandaloneSessionClusterEntrypoint standaloneSessionClusterEntrypoint = new StandaloneSessionClusterEntrypoint(configuration);
            Throwable th = null;
            try {
                try {
                    standaloneSessionClusterEntrypoint.startCluster();
                    Configuration configuration2 = new Configuration(configuration);
                    configuration2.set(JobManagerOptions.PORT, Integer.valueOf(standaloneSessionClusterEntrypoint.getRpcPort()));
                    emptyList = startTaskManagerProcesses(3, configuration2);
                    JobClient submitJob = submitJob(3, standaloneSessionClusterEntrypoint);
                    waitUntilCheckpointCompleted(configuration, standaloneSessionClusterEntrypoint.getRestPort(), submitJob.getJobID());
                    restartTaskManagerProcesses(emptyList, 2);
                    List list = (List) ((JobExecutionResult) submitJob.getJobExecutionResult().get(45L, TimeUnit.SECONDS)).getAccumulatorResult(ALLOCATION_FAILURES_ACCUMULATOR_NAME);
                    Assertions.assertTrue(list.isEmpty(), list.toString());
                    z = true;
                    if (standaloneSessionClusterEntrypoint != null) {
                        if (0 != 0) {
                            try {
                                standaloneSessionClusterEntrypoint.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            standaloneSessionClusterEntrypoint.close();
                        }
                    }
                    if (1 == 0) {
                        Iterator it = emptyList.iterator();
                        while (it.hasNext()) {
                            printLogOutput((TaskManagerProcess) it.next());
                        }
                    }
                    Iterator it2 = emptyList.iterator();
                    while (it2.hasNext()) {
                        ((TaskManagerProcess) it2.next()).terminate();
                    }
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (!z) {
                Iterator it3 = emptyList.iterator();
                while (it3.hasNext()) {
                    printLogOutput((TaskManagerProcess) it3.next());
                }
            }
            Iterator it4 = emptyList.iterator();
            while (it4.hasNext()) {
                ((TaskManagerProcess) it4.next()).terminate();
            }
            throw th3;
        }
    }

    private static void printLogOutput(TaskManagerProcess taskManagerProcess) {
        Iterator<TestProcessBuilder.TestProcess> it = taskManagerProcess.getProcessHistory().iterator();
        while (it.hasNext()) {
            AbstractTaskManagerProcessFailureRecoveryTest.printProcessLog(taskManagerProcess.getName(), it.next());
        }
    }

    private static void restartTaskManagerProcesses(Collection<TaskManagerProcess> collection, int i) throws IOException, InterruptedException {
        Preconditions.checkArgument(i <= collection.size());
        Iterator<TaskManagerProcess> it = collection.iterator();
        for (int i2 = 0; i2 < i; i2++) {
            it.next().restartProcess(createTaskManagerProcessBuilder());
        }
    }

    private static Collection<TaskManagerProcess> startTaskManagerProcesses(int i, Configuration configuration) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            Configuration configuration2 = new Configuration(configuration);
            configuration2.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, "taskManager_" + i2);
            arrayList.add(new TaskManagerProcess(configuration2, startTaskManagerProcess(configuration2)));
        }
        return arrayList;
    }

    private static TestProcessBuilder.TestProcess startTaskManagerProcess(Configuration configuration) throws IOException {
        TestProcessBuilder createTaskManagerProcessBuilder = createTaskManagerProcessBuilder();
        createTaskManagerProcessBuilder.addConfigAsMainClassArgs(configuration);
        return createTaskManagerProcessBuilder.start();
    }

    @Nonnull
    private static TestProcessBuilder createTaskManagerProcessBuilder() throws IOException {
        return new TestProcessBuilder(TaskExecutorProcessEntryPoint.class.getName());
    }

    private void waitUntilCheckpointCompleted(Configuration configuration, int i, JobID jobID) throws Exception {
        RestClient restClient = new RestClient(configuration, Executors.directExecutor());
        JobMessageParameters jobMessageParameters = new JobMessageParameters();
        jobMessageParameters.jobPathParameter.resolve(jobID);
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(((CheckpointingStatistics) restClient.sendRequest("localhost", i, CheckpointingStatisticsHeaders.getInstance(), jobMessageParameters, EmptyRequestBody.getInstance()).join()).getCounts().getNumberCompletedCheckpoints() > 0);
        });
    }

    private JobClient submitJob(int i, StandaloneSessionClusterEntrypoint standaloneSessionClusterEntrypoint) throws Exception {
        StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", standaloneSessionClusterEntrypoint.getRestPort(), new Configuration(), new String[0]);
        createRemoteEnvironment.setParallelism(i);
        createRemoteEnvironment.enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        createRemoteEnvironment.addSource(new LocalRecoverySource()).keyBy(num -> {
            return num;
        }).sinkTo(new DiscardingSink());
        return createRemoteEnvironment.executeAsync();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1668944876:
                if (implMethodName.equals("lambda$submitJob$b8c9a877$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/recovery/LocalRecoveryITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return num;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
