package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.class */
public class AkkaRpcServiceTest extends TestLogger {
    private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
    private static AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, Time.milliseconds(10000));

    @AfterClass
    public static void shutdown() {
        akkaRpcService.stopService();
        actorSystem.shutdown();
    }

    @Test
    public void testScheduleRunnable() throws Exception {
        final OneShotLatch oneShotLatch = new OneShotLatch();
        long nanoTime = System.nanoTime();
        akkaRpcService.scheduleRunnable(new Runnable() { // from class: org.apache.flink.runtime.rpc.akka.AkkaRpcServiceTest.1
            @Override // java.lang.Runnable
            public void run() {
                oneShotLatch.trigger();
            }
        }, 100L, TimeUnit.MILLISECONDS).get();
        Assert.assertTrue(oneShotLatch.isTriggered());
        Assert.assertTrue("call was not properly delayed", (System.nanoTime() - nanoTime) / 1000000 >= 100);
    }

    @Test
    public void testExecuteRunnable() throws Exception {
        final OneShotLatch oneShotLatch = new OneShotLatch();
        akkaRpcService.execute(new Runnable() { // from class: org.apache.flink.runtime.rpc.akka.AkkaRpcServiceTest.2
            @Override // java.lang.Runnable
            public void run() {
                oneShotLatch.trigger();
            }
        });
        oneShotLatch.await(30L, TimeUnit.SECONDS);
    }

    @Test
    public void testExecuteCallable() throws InterruptedException, ExecutionException, TimeoutException {
        final OneShotLatch oneShotLatch = new OneShotLatch();
        Assert.assertEquals(42L, ((Integer) akkaRpcService.execute(new Callable<Integer>() { // from class: org.apache.flink.runtime.rpc.akka.AkkaRpcServiceTest.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                oneShotLatch.trigger();
                return 42;
            }
        }).get(30L, TimeUnit.SECONDS)).intValue());
        Assert.assertTrue(oneShotLatch.isTriggered());
    }

    @Test
    public void testGetAddress() {
        Assert.assertEquals(AkkaUtils.getAddress(actorSystem).host().get(), akkaRpcService.getAddress());
    }

    @Test(timeout = 60000)
    public void testTerminationFuture() throws ExecutionException, InterruptedException {
        ActorSystem createDefaultActorSystem = AkkaUtils.createDefaultActorSystem();
        final AkkaRpcService akkaRpcService2 = new AkkaRpcService(createDefaultActorSystem, Time.milliseconds(1000L));
        Future terminationFuture = akkaRpcService2.getTerminationFuture();
        Assert.assertFalse(terminationFuture.isDone());
        FlinkFuture.supplyAsync(new Callable<Void>() { // from class: org.apache.flink.runtime.rpc.akka.AkkaRpcServiceTest.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                akkaRpcService2.stopService();
                return null;
            }
        }, createDefaultActorSystem.dispatcher());
        terminationFuture.get();
    }

    @Test(timeout = 60000)
    public void testScheduledExecutorServiceSimpleSchedule() throws ExecutionException, InterruptedException {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        final OneShotLatch oneShotLatch = new OneShotLatch();
        scheduledExecutor.schedule(new Runnable() { // from class: org.apache.flink.runtime.rpc.akka.AkkaRpcServiceTest.5
            @Override // java.lang.Runnable
            public void run() {
                oneShotLatch.trigger();
            }
        }, 10L, TimeUnit.MILLISECONDS).get();
        Assert.assertTrue(oneShotLatch.isTriggered());
    }

    @Test(timeout = 60000)
    public void testScheduledExecutorServicePeriodicSchedule() throws ExecutionException, InterruptedException {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        long nanoTime = System.nanoTime();
        ScheduledFuture scheduleAtFixedRate = scheduledExecutor.scheduleAtFixedRate(new Runnable() { // from class: org.apache.flink.runtime.rpc.akka.AkkaRpcServiceTest.6
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
            }
        }, 10L, 10L, TimeUnit.MILLISECONDS);
        Assert.assertTrue(!scheduleAtFixedRate.isDone());
        countDownLatch.await();
        Assert.assertTrue(!scheduleAtFixedRate.isDone());
        Assert.assertTrue(System.nanoTime() - nanoTime >= 40);
        scheduleAtFixedRate.cancel(true);
    }

    @Test(timeout = 60000)
    public void testScheduledExecutorServiceWithFixedDelaySchedule() throws ExecutionException, InterruptedException {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        long nanoTime = System.nanoTime();
        ScheduledFuture scheduleWithFixedDelay = scheduledExecutor.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.flink.runtime.rpc.akka.AkkaRpcServiceTest.7
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
            }
        }, 10L, 10L, TimeUnit.MILLISECONDS);
        Assert.assertTrue(!scheduleWithFixedDelay.isDone());
        countDownLatch.await();
        Assert.assertTrue(!scheduleWithFixedDelay.isDone());
        Assert.assertTrue(System.nanoTime() - nanoTime >= 40);
        scheduleWithFixedDelay.cancel(true);
    }

    @Test
    public void testScheduledExecutorServiceCancelWithFixedDelay() throws InterruptedException {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        final OneShotLatch oneShotLatch = new OneShotLatch();
        final OneShotLatch oneShotLatch2 = new OneShotLatch();
        final OneShotLatch oneShotLatch3 = new OneShotLatch();
        ScheduledFuture scheduleWithFixedDelay = scheduledExecutor.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.flink.runtime.rpc.akka.AkkaRpcServiceTest.8
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (oneShotLatch.isTriggered()) {
                        oneShotLatch3.trigger();
                    } else {
                        oneShotLatch.trigger();
                        oneShotLatch2.await();
                    }
                } catch (InterruptedException e) {
                }
            }
        }, 10L, 10L, TimeUnit.MILLISECONDS);
        oneShotLatch.await();
        scheduleWithFixedDelay.cancel(false);
        oneShotLatch2.trigger();
        try {
            oneShotLatch3.await(5 * 10, TimeUnit.MILLISECONDS);
            Assert.fail("The shouldNotBeTriggeredLatch should never be triggered.");
        } catch (TimeoutException e) {
        }
    }
}
