package org.apache.flink.connectors.test.common.environment;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:org/apache/flink/connectors/test/common/environment/MiniClusterTestEnvironment.class */
public class MiniClusterTestEnvironment implements TestEnvironment, ClusterControllable {
    private static final Logger LOG = LoggerFactory.getLogger(MiniClusterTestEnvironment.class);
    private int latestTMIndex = 0;
    private boolean isStarted = false;
    private final MiniClusterWithClientResource miniCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(6).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());

    @Override // org.apache.flink.connectors.test.common.environment.TestEnvironment
    public StreamExecutionEnvironment createExecutionEnvironment() {
        return StreamExecutionEnvironment.getExecutionEnvironment();
    }

    @Override // org.apache.flink.connectors.test.common.environment.ClusterControllable
    public void triggerJobManagerFailover(JobClient jobClient, Runnable runnable) throws ExecutionException, InterruptedException {
        Optional haLeadershipControl = this.miniCluster.getMiniCluster().getHaLeadershipControl();
        if (!haLeadershipControl.isPresent()) {
            throw new UnsupportedOperationException("This MiniCluster does not support JobManager HA");
        }
        HaLeadershipControl haLeadershipControl2 = (HaLeadershipControl) haLeadershipControl.get();
        haLeadershipControl2.revokeJobMasterLeadership(jobClient.getJobID()).get();
        runnable.run();
        haLeadershipControl2.grantJobMasterLeadership(jobClient.getJobID()).get();
    }

    @Override // org.apache.flink.connectors.test.common.environment.ClusterControllable
    public void triggerTaskManagerFailover(JobClient jobClient, Runnable runnable) throws Exception {
        terminateTaskManager();
        CommonTestUtils.waitForNoTaskRunning(() -> {
            return (JobDetailsInfo) this.miniCluster.getRestClusterClient().getJobDetails(jobClient.getJobID()).get();
        }, Deadline.fromNow(Duration.ofMinutes(5L)));
        runnable.run();
        startTaskManager();
    }

    @Override // org.apache.flink.connectors.test.common.environment.ClusterControllable
    public void isolateNetwork(JobClient jobClient, Runnable runnable) {
        throw new UnsupportedOperationException("Cannot isolate network in a MiniCluster");
    }

    @Override // org.apache.flink.connectors.test.common.TestResource
    public void startUp() throws Exception {
        if (this.isStarted) {
            return;
        }
        this.miniCluster.before();
        LOG.debug("MiniCluster is running");
        this.isStarted = true;
    }

    @Override // org.apache.flink.connectors.test.common.TestResource
    public void tearDown() {
        if (this.isStarted) {
            this.isStarted = false;
            this.miniCluster.after();
            LOG.debug("MiniCluster has been tear down");
        }
    }

    private void terminateTaskManager() throws Exception {
        this.miniCluster.getMiniCluster().terminateTaskManager(this.latestTMIndex).get();
        LOG.debug("TaskManager {} has been terminated.", Integer.valueOf(this.latestTMIndex));
    }

    private void startTaskManager() throws Exception {
        this.miniCluster.getMiniCluster().startTaskManager();
        this.latestTMIndex++;
        LOG.debug("New TaskManager {} has been launched.", Integer.valueOf(this.latestTMIndex));
    }

    public String toString() {
        return "MiniCluster";
    }
}
