package org.apache.flink.test.runtime.minicluster;

import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashSet;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.forkjoin.ForkJoinPool;
import scala.concurrent.impl.ExecutionContextImpl;

/* loaded from: input_file:org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.class */
public class LocalFlinkMiniClusterITCase extends TestLogger {
    private static final String[] ALLOWED_THREAD_PREFIXES = {"initialSeedUniquifierGenerator", "ObjectCleanerThread"};

    /* JADX WARN: Finally extract failed */
    @Test
    public void testLocalFlinkMiniClusterWithMultipleTaskManagers() {
        ActorSystem create = ActorSystem.create("Testkit", AkkaUtils.getDefaultAkkaConfig());
        LocalFlinkMiniCluster localFlinkMiniCluster = null;
        HashSet hashSet = new HashSet();
        Thread[] threadArr = new Thread[Thread.activeCount()];
        Thread.enumerate(threadArr);
        hashSet.addAll(Arrays.asList(threadArr));
        try {
            Configuration configuration = new Configuration();
            configuration.setInteger("local.number-taskmanager", 3);
            configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 14);
            localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
            localFlinkMiniCluster.start();
            final ActorGateway leaderGateway = localFlinkMiniCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
            new JavaTestKit(create) { // from class: org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase.1
                {
                    final AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getRef(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                    new JavaTestKit.Within(TestingUtils.TESTING_DURATION()) { // from class: org.apache.flink.test.runtime.minicluster.LocalFlinkMiniClusterITCase.1.1
                        protected void run() {
                            leaderGateway.tell(JobManagerMessages.getRequestNumberRegisteredTaskManager(), akkaActorGateway);
                            expectMsgEquals(TestingUtils.TESTING_DURATION(), 3);
                            leaderGateway.tell(JobManagerMessages.getRequestTotalNumberOfSlots(), akkaActorGateway);
                            expectMsgEquals(TestingUtils.TESTING_DURATION(), 42);
                        }
                    };
                }
            };
            if (localFlinkMiniCluster != null) {
                localFlinkMiniCluster.stop();
                localFlinkMiniCluster.awaitTermination();
            }
            JavaTestKit.shutdownActorSystem(create);
            create.awaitTermination();
            try {
                Field declaredField = ExecutionContextImpl.class.getDeclaredField("executor");
                declaredField.setAccessible(true);
                ((ForkJoinPool) declaredField.get(ExecutionContext$.MODULE$.global())).shutdownNow();
                long currentTimeMillis = System.currentTimeMillis() + 30000;
                boolean z = true;
                String str = "";
                while (System.currentTimeMillis() < currentTimeMillis) {
                    Thread[] threadArr2 = new Thread[Thread.activeCount()];
                    Thread.enumerate(threadArr2);
                    z = false;
                    int length = threadArr2.length;
                    int i = 0;
                    while (true) {
                        if (i >= length) {
                            break;
                        }
                        Thread thread = threadArr2[i];
                        if (thread.isAlive() && !hashSet.contains(thread)) {
                            boolean z2 = false;
                            String[] strArr = ALLOWED_THREAD_PREFIXES;
                            int length2 = strArr.length;
                            int i2 = 0;
                            while (true) {
                                if (i2 >= length2) {
                                    break;
                                }
                                if (thread.getName().startsWith(strArr[i2])) {
                                    z2 = true;
                                    break;
                                }
                                i2++;
                            }
                            if (!z2) {
                                z = true;
                                str = thread.toString();
                                break;
                            }
                        }
                        i++;
                    }
                    if (!z) {
                        break;
                    } else {
                        try {
                            Thread.sleep(500L);
                        } catch (InterruptedException e) {
                        }
                    }
                }
                if (z) {
                    Assert.fail("Thread " + str + " was started by the mini cluster, but not shut down");
                }
            } catch (Exception e2) {
                System.err.println("Cannot test proper thread shutdown for local execution.");
            }
        } catch (Throwable th) {
            if (localFlinkMiniCluster != null) {
                localFlinkMiniCluster.stop();
                localFlinkMiniCluster.awaitTermination();
            }
            JavaTestKit.shutdownActorSystem(create);
            create.awaitTermination();
            throw th;
        }
    }
}
