package org.apache.flink.runtime.rpc.akka;

import java.net.BindException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.CheckedSupplier;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaBootstrapToolsTest.class */
class AkkaBootstrapToolsTest {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaBootstrapToolsTest.class);

    AkkaBootstrapToolsTest() {
    }

    @Test
    void testConcurrentActorSystemCreation() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
        try {
            FutureUtils.completeAll((List) IntStream.range(0, 10).mapToObj(i -> {
                return CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> {
                    cyclicBarrier.await();
                    return AkkaBootstrapTools.startRemoteActorSystem(new Configuration(), "localhost", "0", LOG);
                }), newFixedThreadPool);
            }).map(completableFuture -> {
                return completableFuture.thenCompose(AkkaUtils::terminateActorSystem);
            }).collect(Collectors.toList())).get();
            ExecutorUtils.gracefulShutdown(10000L, TimeUnit.MILLISECONDS, new ExecutorService[]{newFixedThreadPool});
        } catch (Throwable th) {
            ExecutorUtils.gracefulShutdown(10000L, TimeUnit.MILLISECONDS, new ExecutorService[]{newFixedThreadPool});
            throw th;
        }
    }

    @Test
    void testActorSystemInstantiationFailureWhenPortOccupied() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0, 10, InetAddress.getByName("0.0.0.0"));
        try {
            int localPort = serverSocket.getLocalPort();
            Assertions.assertThatThrownBy(() -> {
                AkkaBootstrapTools.startRemoteActorSystem(new Configuration(), "0.0.0.0", String.valueOf(localPort), LOG);
            }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(BindException.class)});
        } finally {
            serverSocket.close();
        }
    }
}
