package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.class */
class AkkaRpcServiceTest {
    private static ActorSystem actorSystem;
    private static AkkaRpcService akkaRpcService;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest$CountingAsynchronousOnStopEndpoint.class */
    public static class CountingAsynchronousOnStopEndpoint extends AkkaRpcActorTest.AsynchronousOnStopEndpoint {
        private final CountDownLatch countDownLatch;

        protected CountingAsynchronousOnStopEndpoint(RpcService rpcService, CompletableFuture<Void> completableFuture, CountDownLatch countDownLatch) {
            super(rpcService, completableFuture);
            this.countDownLatch = countDownLatch;
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.AsynchronousOnStopEndpoint
        public CompletableFuture<Void> onStop() {
            this.countDownLatch.countDown();
            return super.onStop();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest$OnStopException.class */
    private static class OnStopException extends FlinkException {
        private static final long serialVersionUID = 7136609202083168954L;

        public OnStopException(String str) {
            super(str);
        }
    }

    AkkaRpcServiceTest() {
    }

    @BeforeAll
    static void setup() {
        actorSystem = AkkaUtils.createDefaultActorSystem();
        akkaRpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterAll
    static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
        FutureUtils.waitForAll(Arrays.asList(akkaRpcService.stopService(), AkkaFutureUtils.toJava(actorSystem.terminate()))).get();
        actorSystem = null;
        akkaRpcService = null;
    }

    @Test
    void testScheduleRunnable() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        long nanoTime = System.nanoTime();
        AkkaRpcService akkaRpcService2 = akkaRpcService;
        oneShotLatch.getClass();
        akkaRpcService2.scheduleRunnable(oneShotLatch::trigger, 100L, TimeUnit.MILLISECONDS).get();
        Assertions.assertThat(oneShotLatch.isTriggered()).isTrue();
        Assertions.assertThat((System.nanoTime() - nanoTime) / 1000000).as("call was not properly delayed", new Object[0]).isGreaterThanOrEqualTo(100L);
    }

    @Test
    void testExecuteRunnable() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        AkkaRpcService akkaRpcService2 = akkaRpcService;
        oneShotLatch.getClass();
        akkaRpcService2.execute(oneShotLatch::trigger);
        oneShotLatch.await(30L, TimeUnit.SECONDS);
    }

    @Test
    void testExecuteCallable() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        Assertions.assertThat(((Integer) akkaRpcService.execute(() -> {
            oneShotLatch.trigger();
            return 42;
        }).get(30L, TimeUnit.SECONDS)).intValue()).isEqualTo(42);
        Assertions.assertThat(oneShotLatch.isTriggered()).isTrue();
    }

    @Test
    void testGetAddress() {
        Assertions.assertThat(akkaRpcService.getAddress()).isEqualTo((String) AkkaUtils.getAddress(actorSystem).host().get());
    }

    @Test
    void testGetPort() {
        Assertions.assertThat(akkaRpcService.getPort()).isEqualTo(AkkaUtils.getAddress(actorSystem).port().get());
    }

    @Test
    void testTerminationFuture() throws Exception {
        AkkaRpcService startAkkaRpcService = startAkkaRpcService();
        CompletableFuture terminationFuture = startAkkaRpcService.getTerminationFuture();
        Assertions.assertThat(terminationFuture).isNotDone();
        startAkkaRpcService.stopService();
        terminationFuture.get();
    }

    @Test
    void testScheduledExecutorServiceSimpleSchedule() throws Exception {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        OneShotLatch oneShotLatch = new OneShotLatch();
        oneShotLatch.getClass();
        scheduledExecutor.schedule(oneShotLatch::trigger, 10L, TimeUnit.MILLISECONDS).get();
        Assertions.assertThat(oneShotLatch.isTriggered()).isTrue();
    }

    @Test
    void testScheduledExecutorServicePeriodicSchedule() throws Exception {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        CountDownLatch countDownLatch = new CountDownLatch(4);
        long nanoTime = System.nanoTime();
        countDownLatch.getClass();
        ScheduledFuture scheduleAtFixedRate = scheduledExecutor.scheduleAtFixedRate(countDownLatch::countDown, 10L, 10L, TimeUnit.MILLISECONDS);
        Assertions.assertThat(scheduleAtFixedRate).isNotDone();
        countDownLatch.await();
        Assertions.assertThat(scheduleAtFixedRate).isNotDone();
        Assertions.assertThat(System.nanoTime() - nanoTime).isGreaterThanOrEqualTo(40L);
        scheduleAtFixedRate.cancel(true);
    }

    @Test
    void testScheduledExecutorServiceWithFixedDelaySchedule() throws Exception {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        CountDownLatch countDownLatch = new CountDownLatch(4);
        long nanoTime = System.nanoTime();
        countDownLatch.getClass();
        ScheduledFuture scheduleWithFixedDelay = scheduledExecutor.scheduleWithFixedDelay(countDownLatch::countDown, 10L, 10L, TimeUnit.MILLISECONDS);
        Assertions.assertThat(scheduleWithFixedDelay).isNotDone();
        countDownLatch.await();
        Assertions.assertThat(scheduleWithFixedDelay).isNotDone();
        Assertions.assertThat(System.nanoTime() - nanoTime).isGreaterThanOrEqualTo(40L);
        scheduleWithFixedDelay.cancel(true);
    }

    @Test
    void testScheduledExecutorServiceCancelWithFixedDelay() throws InterruptedException {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        long j = 10;
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        OneShotLatch oneShotLatch3 = new OneShotLatch();
        ScheduledFuture scheduleWithFixedDelay = scheduledExecutor.scheduleWithFixedDelay(() -> {
            try {
                if (oneShotLatch.isTriggered()) {
                    oneShotLatch3.trigger();
                } else {
                    oneShotLatch.trigger();
                    oneShotLatch2.await();
                }
            } catch (InterruptedException e) {
            }
        }, 10L, 10L, TimeUnit.MILLISECONDS);
        oneShotLatch.await();
        scheduleWithFixedDelay.cancel(false);
        oneShotLatch2.trigger();
        Assertions.assertThatThrownBy(() -> {
            oneShotLatch3.await(5 * j, TimeUnit.MILLISECONDS);
        }).isInstanceOf(TimeoutException.class);
    }

    @Test
    void testAkkaRpcServiceShutDownWithRpcEndpoints() throws Exception {
        RpcService startAkkaRpcService = startAkkaRpcService();
        try {
            CompletableFuture terminationFuture = startAkkaRpcService.getTerminationFuture();
            Iterator<CompletableFuture<Void>> it = startStopNCountingAsynchronousOnStopEndpoints(startAkkaRpcService, 5).iterator();
            while (it.hasNext()) {
                it.next().complete(null);
            }
            terminationFuture.get();
            Assertions.assertThat(startAkkaRpcService.getActorSystem().whenTerminated().isCompleted()).isTrue();
            RpcUtils.terminateRpcService(new RpcService[]{startAkkaRpcService});
        } catch (Throwable th) {
            RpcUtils.terminateRpcService(new RpcService[]{startAkkaRpcService});
            throw th;
        }
    }

    @Test
    void testAkkaRpcServiceShutDownWithFailingRpcEndpoints() throws Exception {
        AkkaRpcService startAkkaRpcService = startAkkaRpcService();
        CompletableFuture terminationFuture = startAkkaRpcService.getTerminationFuture();
        Collection<CompletableFuture<Void>> startStopNCountingAsynchronousOnStopEndpoints = startStopNCountingAsynchronousOnStopEndpoints(startAkkaRpcService, 5);
        Iterator<CompletableFuture<Void>> it = startStopNCountingAsynchronousOnStopEndpoints.iterator();
        for (int i = 0; i < 4; i++) {
            it.next().complete(null);
        }
        it.next().completeExceptionally(new OnStopException("onStop exception occurred."));
        Iterator<CompletableFuture<Void>> it2 = startStopNCountingAsynchronousOnStopEndpoints.iterator();
        while (it2.hasNext()) {
            it2.next().complete(null);
        }
        Assertions.assertThatThrownBy(() -> {
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(OnStopException.class)});
        Assertions.assertThat(startAkkaRpcService.getActorSystem().whenTerminated().isCompleted()).isTrue();
    }

    @Test
    void failsRpcResultImmediatelyIfEndpointIsStopped() throws Exception {
        AkkaRpcActorTest.SerializedValueRespondingEndpoint serializedValueRespondingEndpoint = new AkkaRpcActorTest.SerializedValueRespondingEndpoint(akkaRpcService);
        Throwable th = null;
        try {
            serializedValueRespondingEndpoint.start();
            AkkaRpcActorTest.SerializedValueRespondingGateway serializedValueRespondingGateway = (AkkaRpcActorTest.SerializedValueRespondingGateway) akkaRpcService.connect(serializedValueRespondingEndpoint.getAddress(), AkkaRpcActorTest.SerializedValueRespondingGateway.class).join();
            serializedValueRespondingEndpoint.close();
            Assertions.assertThatThrownBy(() -> {
                serializedValueRespondingGateway.getSerializedValue().join();
            }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RecipientUnreachableException.class)});
            if (serializedValueRespondingEndpoint != null) {
                if (0 == 0) {
                    serializedValueRespondingEndpoint.close();
                    return;
                }
                try {
                    serializedValueRespondingEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (serializedValueRespondingEndpoint != null) {
                if (0 != 0) {
                    try {
                        serializedValueRespondingEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    serializedValueRespondingEndpoint.close();
                }
            }
            throw th3;
        }
    }

    private Collection<CompletableFuture<Void>> startStopNCountingAsynchronousOnStopEndpoints(AkkaRpcService akkaRpcService2, int i) throws Exception {
        ArrayList arrayList = new ArrayList(i);
        CountDownLatch countDownLatch = new CountDownLatch(i);
        for (int i2 = 0; i2 < i; i2++) {
            CompletableFuture completableFuture = new CompletableFuture();
            new CountingAsynchronousOnStopEndpoint(akkaRpcService2, completableFuture, countDownLatch).start();
            arrayList.add(completableFuture);
        }
        Assertions.assertThat(akkaRpcService2.stopService()).isNotDone();
        Assertions.assertThat(akkaRpcService2.getActorSystem().whenTerminated().isCompleted()).isFalse();
        countDownLatch.await();
        return arrayList;
    }

    @Nonnull
    private AkkaRpcService startAkkaRpcService() {
        return new AkkaRpcService(AkkaUtils.createDefaultActorSystem(), AkkaRpcServiceConfiguration.defaultConfiguration());
    }
}
