package org.apache.flink.test.misc;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.apache.flink.util.ExceptionUtils;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.util.ReadOnlyStringMap;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:org/apache/flink/test/misc/JobIDLoggingITCase.class */
class JobIDLoggingITCase {

    @RegisterExtension
    public final LoggerAuditingExtension checkpointCoordinatorLogging = new LoggerAuditingExtension(CheckpointCoordinator.class, Level.DEBUG);

    @RegisterExtension
    public final LoggerAuditingExtension streamTaskLogging = new LoggerAuditingExtension(StreamTask.class, Level.DEBUG);

    @RegisterExtension
    public final LoggerAuditingExtension taskExecutorLogging = new LoggerAuditingExtension(TaskExecutor.class, Level.DEBUG);

    @RegisterExtension
    public final LoggerAuditingExtension taskLogging = new LoggerAuditingExtension(Task.class, Level.DEBUG);

    @RegisterExtension
    public final LoggerAuditingExtension executionGraphLogging = new LoggerAuditingExtension(ExecutionGraph.class, Level.DEBUG);

    @RegisterExtension
    public final LoggerAuditingExtension jobMasterLogging = new LoggerAuditingExtension(JobMaster.class, Level.DEBUG);

    @RegisterExtension
    public final LoggerAuditingExtension adaptiveSchedulerLogging = new LoggerAuditingExtension(AdaptiveScheduler.class, Level.DEBUG);

    @RegisterExtension
    public final LoggerAuditingExtension asyncCheckpointRunnableLogging = new LoggerAuditingExtension("org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable", Level.DEBUG);
    private static final Logger logger = LoggerFactory.getLogger(JobIDLoggingITCase.class);

    @RegisterExtension
    public static MiniClusterExtension miniClusterResource = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setConfiguration(getConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());

    JobIDLoggingITCase() {
    }

    private static Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
        return configuration;
    }

    @Test
    void testJobIDLogging(@InjectClusterClient ClusterClient<?> clusterClient) throws Exception {
        JobID runJob = runJob(clusterClient);
        clusterClient.cancel(runJob).get();
        assertJobIDPresent(runJob, this.checkpointCoordinatorLogging, Arrays.asList("No checkpoint found during restore.", "Resetting the master hooks.", "Triggering checkpoint .*", "Received acknowledge message for checkpoint .*", "Completed checkpoint .*", "Checkpoint state: .*"), new String[0]);
        assertJobIDPresent(runJob, this.streamTaskLogging, Arrays.asList("State backend is set to .*", "Initializing Source: .*", "Invoking Source: .*", "Starting checkpoint .*", "Notify checkpoint \\d+ complete .*"), new String[0]);
        assertJobIDPresent(runJob, this.taskExecutorLogging, Arrays.asList("Received task .*", "Trigger checkpoint .*", "Confirm completed checkpoint .*"), "TaskManager received a checkpoint confirmation for unknown task.*", "TaskManager received an aborted checkpoint for unknown task.*", "Un-registering task.*", "Successful registration.*", "Establish JobManager connection.*", "Offer reserved slots.*", ".*ResourceManager.*", "Operator event.*", "Recovered slot allocation snapshots.*", ".*heartbeat.*", ".*leadership.*");
        assertJobIDPresent(runJob, this.taskLogging, Arrays.asList("Source: .* switched from CREATED to DEPLOYING.", "Source: .* switched from DEPLOYING to INITIALIZING.", "Source: .* switched from INITIALIZING to RUNNING."), new String[0]);
        assertJobIDPresent(runJob, this.executionGraphLogging, Arrays.asList("Created execution graph .*", "Deploying Source.*", "Job .* switched from state CREATED to RUNNING.", "Source: .* switched from CREATED to SCHEDULED.", "Source: .* switched from SCHEDULED to DEPLOYING.", "Source: .* switched from DEPLOYING to INITIALIZING.", "Source: .* switched from INITIALIZING to RUNNING."), new String[0]);
        assertJobIDPresent(runJob, this.adaptiveSchedulerLogging, Arrays.asList("Checkpoint storage is set to .*", "Running initialization on master for job .*", "Successfully created execution graph from job graph .*", "Successfully ran initialization on master.*"), "Registration at ResourceManager.*", "Registration with ResourceManager.*", "Resolved ResourceManager address.*");
        assertJobIDPresent(runJob, this.jobMasterLogging, Arrays.asList("Initializing job .*", "Starting execution of job .*", "Using restart back off time strategy .*"), "Registration at ResourceManager.*", "Registration with ResourceManager.*", "Resolved ResourceManager address.*");
        assertJobIDPresent(runJob, this.asyncCheckpointRunnableLogging, Arrays.asList(".* started executing asynchronous part of checkpoint .*", ".* finished asynchronous part of checkpoint .*"), new String[0]);
    }

    private static void assertJobIDPresent(JobID jobID, LoggerAuditingExtension loggerAuditingExtension, List<String> list, String... strArr) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        List list2 = (List) list.stream().map(Pattern::compile).collect(Collectors.toList());
        List list3 = (List) Arrays.stream(strArr).map(Pattern::compile).collect(Collectors.toList());
        for (LogEvent logEvent : loggerAuditingExtension.getEvents()) {
            ReadOnlyStringMap contextData = logEvent.getContextData();
            if (contextData.containsKey("flink-job-id")) {
                if (Objects.equals(contextData.getValue("flink-job-id"), jobID.toHexString())) {
                    list2.removeIf(pattern -> {
                        return pattern.matcher(logEvent.getMessage().getFormattedMessage()).matches();
                    });
                } else {
                    arrayList2.add(logEvent);
                }
            } else if (matchesAny(list3, logEvent.getMessage().getFormattedMessage())) {
                arrayList3.add(logEvent);
            } else {
                arrayList.add(logEvent);
            }
        }
        logger.debug("checked events for {}:\n  {};\n  ignored: {},\n  wrong job id: {},\n  missing job id: {}", new Object[]{loggerAuditingExtension.getLoggerName(), loggerAuditingExtension.getEvents(), arrayList3, arrayList2, arrayList});
        Assertions.assertThat(arrayList2).as("events with a wrong Job ID", new Object[0]).isEmpty();
        Assertions.assertThat(list2).as("not all expected events logged by %s, logged:\n%s", new Object[]{loggerAuditingExtension.getLoggerName(), loggerAuditingExtension.getEvents()}).isEmpty();
        Assertions.assertThat(arrayList).as("too many events without Job ID logged by %s", new Object[]{loggerAuditingExtension.getLoggerName()}).isEmpty();
    }

    private static boolean matchesAny(List<Pattern> list, String str) {
        return list.stream().anyMatch(pattern -> {
            return pattern.matcher(str).matches();
        });
    }

    private static JobID runJob(ClusterClient<?> clusterClient) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).addSink(new DiscardingSink());
        JobID jobID = (JobID) clusterClient.submitJob(executionEnvironment.getStreamGraph().getJobGraph()).get();
        Deadline fromNow = Deadline.fromNow(Duration.ofMinutes(5L));
        while (fromNow.hasTimeLeft() && ((Collection) clusterClient.listJobs().get()).stream().noneMatch(jobStatusMessage -> {
            return jobStatusMessage.getJobId().equals(jobID) && jobStatusMessage.getJobState().equals(JobStatus.RUNNING);
        })) {
            Thread.sleep(10L);
        }
        while (true) {
            try {
                clusterClient.triggerCheckpoint(jobID, CheckpointType.DEFAULT).get();
                clusterClient.triggerCheckpoint(jobID, CheckpointType.DEFAULT).get();
                return jobID;
            } catch (ExecutionException e) {
                if (!ExceptionUtils.findThrowable(e, CheckpointException.class).isPresent() || fromNow.isOverdue()) {
                    throw e;
                }
                Thread.sleep(10L);
            }
        }
        throw e;
    }
}
