package com.github.knaufk.flinkjunit;

import akka.actor.ActorRef;
import akka.dispatch.Futures;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.TestEnvironment;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Assert;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:com/github/knaufk/flinkjunit/FlinkJUnitRule.class */
public class FlinkJUnitRule extends ExternalResource {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkJUnitRule.class);
    private static final int DEFAULT_PARALLELISM = 4;
    private Configuration configuration;
    private LocalFlinkMiniCluster miniCluster;
    private TestingServer localZk;

    public FlinkJUnitRule(Configuration configuration) {
        this.configuration = configuration;
    }

    public int getFlinkUiPort() {
        return this.configuration.getInteger("jobmanager.web.port", -1);
    }

    protected void before() throws Throwable {
        if (zookeeperHaEnabled()) {
            startLocalZookeeperAndUpdateConfig();
        }
        if (webUiEnabled()) {
            setPortForWebUiAndUpdateConfig();
        }
        this.miniCluster = startCluster();
        setEnvContextToMiniCluster(this.miniCluster);
    }

    protected void after() {
        try {
            try {
                stopCluster(this.miniCluster, new FiniteDuration(1L, TimeUnit.SECONDS));
                if (zookeeperHaEnabled()) {
                    stopZookeeper(this.localZk);
                }
            } catch (Exception e) {
                throw new FlinkJUnitException("Exception while stopping local cluster.", e);
            }
        } finally {
            TestStreamEnvironment.unsetAsContext();
        }
    }

    private void setPortForWebUiAndUpdateConfig() {
        if (this.configuration.getInteger("jobmanager.web.port", -1) == 0) {
            this.configuration.setInteger("jobmanager.web.port", availablePort());
        }
    }

    private LocalFlinkMiniCluster startCluster() {
        LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(this.configuration, false);
        localFlinkMiniCluster.start();
        return localFlinkMiniCluster;
    }

    private void setEnvContextToMiniCluster(LocalFlinkMiniCluster localFlinkMiniCluster) {
        TestStreamEnvironment.setAsContext(localFlinkMiniCluster, 4);
        new TestEnvironment(localFlinkMiniCluster, 4).setAsContext();
    }

    private void startLocalZookeeperAndUpdateConfig() throws Exception {
        LOG.info("Zookeeper is choosen for HA. Starting local Zookeeper...");
        this.localZk = new TestingServer();
        int port = this.localZk.getPort();
        this.configuration.setString("high-availability.zookeeper.quorum", "localhost:" + port);
        this.localZk.start();
        LOG.debug("Zookeeper started on port {}", Integer.valueOf(port));
    }

    private void stopZookeeper(TestingServer testingServer) throws IOException {
        LOG.info("Stopping local zookeeper...");
        testingServer.stop();
    }

    private boolean zookeeperHaEnabled() {
        return this.configuration.getString(HighAvailabilityOptions.HA_MODE).equals("zookeeper");
    }

    private boolean webUiEnabled() {
        return this.configuration.getBoolean("local.start-webserver", false);
    }

    private void stopCluster(LocalFlinkMiniCluster localFlinkMiniCluster, FiniteDuration finiteDuration) throws Exception {
        if (localFlinkMiniCluster != null) {
            int i = 0;
            int i2 = 0;
            if (localFlinkMiniCluster.running()) {
                List<ActorRef> taskManagersAsJava = localFlinkMiniCluster.getTaskManagersAsJava();
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                for (ActorRef actorRef : taskManagersAsJava) {
                    arrayList.add(Patterns.ask(actorRef, TaskManagerMessages.getRequestBroadcastVariablesWithReferences(), new Timeout(finiteDuration)));
                    arrayList2.add(Patterns.ask(actorRef, TaskManagerMessages.getRequestNumActiveConnections(), new Timeout(finiteDuration)));
                }
                Iterator it = ((Iterable) Await.result(Futures.sequence(arrayList, defaultExecutionContext()), finiteDuration)).iterator();
                while (it.hasNext()) {
                    i += ((TaskManagerMessages.ResponseBroadcastVariablesWithReferences) it.next()).number();
                }
                Iterator it2 = ((Iterable) Await.result(Futures.sequence(arrayList2, defaultExecutionContext()), finiteDuration)).iterator();
                while (it2.hasNext()) {
                    i2 += ((TaskManagerMessages.ResponseNumActiveConnections) it2.next()).number();
                }
            }
            localFlinkMiniCluster.stop();
            FileSystem.closeAll();
            Assert.assertEquals("Not all broadcast variables were released.", 0L, i);
            Assert.assertEquals("Not all TCP connections were released.", 0L, i2);
        }
    }

    private int availablePort() {
        try {
            ServerSocket serverSocket = new ServerSocket(0);
            Throwable th = null;
            try {
                int localPort = serverSocket.getLocalPort();
                LOG.info("Setting WebUI port to random port. Port is {}.", Integer.valueOf(localPort));
                if (serverSocket != null) {
                    if (0 != 0) {
                        try {
                            serverSocket.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        serverSocket.close();
                    }
                }
                return localPort;
            } finally {
            }
        } catch (IOException e) {
            LOG.error("Exception while finding a random port for the Flink WebUi.");
            throw new FlinkJUnitException("Exception while finding a random port for the Flink WebUi.", e);
        }
    }

    private ExecutionContext defaultExecutionContext() {
        return ExecutionContext$.MODULE$.global();
    }
}
