package org.apache.flink.runtime.operators.lifecycle;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorStartedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestCommandAckEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.Assert;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.class */
public class TestJobExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(TestJobExecutor.class);
    private final MiniClusterWithClientResource miniClusterResource;
    private final TestJobWithDescription testJob;
    private final JobID jobID;

    private TestJobExecutor(TestJobWithDescription testJobWithDescription, JobID jobID, MiniClusterWithClientResource miniClusterWithClientResource) {
        this.testJob = testJobWithDescription;
        this.jobID = jobID;
        this.miniClusterResource = miniClusterWithClientResource;
    }

    public static TestJobExecutor execute(TestJobWithDescription testJobWithDescription, MiniClusterWithClientResource miniClusterWithClientResource) throws Exception {
        LOG.info("submitGraph: {}", testJobWithDescription.jobGraph);
        JobID jobID = (JobID) miniClusterWithClientResource.getClusterClient().submitJob(testJobWithDescription.jobGraph).get();
        CommonTestUtils.waitForAllTaskRunning(miniClusterWithClientResource.getMiniCluster(), jobID, false);
        return new TestJobExecutor(testJobWithDescription, jobID, miniClusterWithClientResource);
    }

    public TestJobExecutor waitForAllRunning() throws Exception {
        LOG.info("waitForAllRunning in {}", this.jobID);
        CommonTestUtils.waitForAllTaskRunning(this.miniClusterResource.getMiniCluster(), this.jobID, true);
        return this;
    }

    public TestJobExecutor waitForEvent(Class<? extends TestEvent> cls) throws Exception {
        LOG.info("waitForEvent: {}", cls.getSimpleName());
        this.testJob.eventQueue.withHandler(testEvent -> {
            return cls.isAssignableFrom(testEvent.getClass()) ? TestEventQueue.TestEventHandler.TestEventNextAction.STOP : TestEventQueue.TestEventHandler.TestEventNextAction.CONTINUE;
        });
        return this;
    }

    public TestJobExecutor stopWithSavepoint(TemporaryFolder temporaryFolder, boolean z) throws Exception {
        LOG.info("stopWithSavepoint: {} (withDrain: {})", temporaryFolder, Boolean.valueOf(z));
        this.miniClusterResource.getClusterClient().stopWithSavepoint(this.jobID, z, temporaryFolder.newFolder().toString(), SavepointFormatType.CANONICAL).get();
        return this;
    }

    public TestJobExecutor sendOperatorCommand(String str, TestCommand testCommand, TestCommandDispatcher.TestCommandScope testCommandScope) {
        LOG.info("send command: {} to {}/{}", new Object[]{testCommand, str, testCommandScope});
        this.testJob.commandQueue.dispatch(testCommand, testCommandScope, str);
        return this;
    }

    public void triggerFailover(String str) throws Exception {
        LOG.info("sendCommand: {}", TestCommand.FAIL);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        linkedBlockingQueue.getClass();
        Consumer<TestEvent> consumer = (v1) -> {
            r0.add(v1);
        };
        this.testJob.eventQueue.addListener(consumer);
        this.testJob.commandQueue.dispatch(TestCommand.FAIL, TestCommandDispatcher.TestCommandScope.SINGLE_SUBTASK, str);
        try {
            try {
                waitForFailover(linkedBlockingQueue);
                this.testJob.eventQueue.removeListener(consumer);
            } catch (TimeoutException e) {
                handleFailoverTimeout(e);
                this.testJob.eventQueue.removeListener(consumer);
            }
            waitForAllRunning();
        } catch (Throwable th) {
            this.testJob.eventQueue.removeListener(consumer);
            throw th;
        }
    }

    private void waitForFailover(BlockingQueue<TestEvent> blockingQueue) throws Exception {
        Deadline fromNow = Deadline.fromNow(Duration.ofMillis(10000));
        String str = null;
        int i = -1;
        int i2 = -1;
        while (fromNow.hasTimeLeft()) {
            TestEvent poll = blockingQueue.poll(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            if (poll instanceof TestCommandAckEvent) {
                TestCommandAckEvent testCommandAckEvent = (TestCommandAckEvent) poll;
                if (testCommandAckEvent.getCommand() == TestCommand.FAIL) {
                    str = testCommandAckEvent.operatorId;
                    i = testCommandAckEvent.subtaskIndex;
                    i2 = testCommandAckEvent.getAttemptNumber();
                }
            } else if ((poll instanceof OperatorStartedEvent) && str != null) {
                OperatorStartedEvent operatorStartedEvent = (OperatorStartedEvent) poll;
                if (operatorStartedEvent.operatorId.equals(str) && operatorStartedEvent.subtaskIndex == i && operatorStartedEvent.getAttemptNumber() >= i2) {
                    return;
                }
            }
        }
        throw new TimeoutException("No subtask restarted in 10000ms");
    }

    private void handleFailoverTimeout(TimeoutException timeoutException) throws Exception {
        JobStatus jobStatus = (JobStatus) this.miniClusterResource.getClusterClient().getJobStatus(this.jobID).get();
        String format = String.format("Unable to failover the job: %s; job status: %s", timeoutException.getMessage(), jobStatus);
        if (jobStatus.isGloballyTerminalState()) {
            Optional serializedThrowable = ((JobResult) this.miniClusterResource.getClusterClient().requestJobResult(this.jobID).get()).getSerializedThrowable();
            if (serializedThrowable.isPresent()) {
                throw new RuntimeException(format, (Throwable) serializedThrowable.get());
            }
        }
        throw new RuntimeException(format);
    }

    public TestJobExecutor sendBroadcastCommand(TestCommand testCommand, TestCommandDispatcher.TestCommandScope testCommandScope) {
        LOG.info("sendCommand: {}", testCommand);
        this.testJob.commandQueue.broadcast(testCommand, testCommandScope);
        return this;
    }

    public TestJobExecutor waitForTermination() throws Exception {
        LOG.info("waitForTermination");
        while (!((JobStatus) this.miniClusterResource.getClusterClient().getJobStatus(this.jobID).get()).isGloballyTerminalState()) {
            Thread.sleep(1L);
        }
        return this;
    }

    public TestJobExecutor assertFinishedSuccessfully() throws Exception {
        LOG.info("assertFinishedSuccessfully");
        JobStatus jobStatus = (JobStatus) this.miniClusterResource.getClusterClient().getJobStatus(this.jobID).get();
        if (!jobStatus.equals(JobStatus.FINISHED)) {
            String format = String.format("Job didn't finish successfully, status: %s", jobStatus);
            Optional serializedThrowable = ((JobResult) this.miniClusterResource.getClusterClient().requestJobResult(this.jobID).get()).getSerializedThrowable();
            if (serializedThrowable.isPresent()) {
                throw new AssertionError(format, (Throwable) serializedThrowable.get());
            }
            Assert.fail(format);
        }
        return this;
    }

    public TestJobExecutor waitForSubtasksToFinish(JobVertexID jobVertexID, TestCommandDispatcher.TestCommandScope testCommandScope) throws Exception {
        LOG.info("waitForSubtasksToFinish vertex {}, all subtasks: {}", jobVertexID, testCommandScope);
        CommonTestUtils.waitForSubtasksToFinish(this.miniClusterResource.getMiniCluster(), this.jobID, jobVertexID, testCommandScope == TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS);
        return this;
    }
}
