package org.apache.flink.runtime.rpc;

import akka.actor.ActorSystem;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/AsyncCallsTest.class */
public class AsyncCallsTest extends TestLogger {
    private static final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
    private static final Time timeout = Time.seconds(10);
    private static final AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, Time.milliseconds(10000));

    /* loaded from: input_file:org/apache/flink/runtime/rpc/AsyncCallsTest$FencedTestEndpoint.class */
    public static class FencedTestEndpoint extends FencedRpcEndpoint<UUID> implements FencedTestGateway {
        private final OneShotLatch enteringSetNewFencingToken;
        private final OneShotLatch triggerSetNewFencingToken;

        protected FencedTestEndpoint(RpcService rpcService, UUID uuid, OneShotLatch oneShotLatch, OneShotLatch oneShotLatch2) {
            super(rpcService);
            this.enteringSetNewFencingToken = oneShotLatch;
            this.triggerSetNewFencingToken = oneShotLatch2;
            this.currentMainThread.set(Thread.currentThread());
            try {
                setFencingToken(uuid);
                this.currentMainThread.set(null);
            } catch (Throwable th) {
                this.currentMainThread.set(null);
                throw th;
            }
        }

        @Override // org.apache.flink.runtime.rpc.AsyncCallsTest.FencedTestGateway
        public CompletableFuture<Acknowledge> setNewFencingToken(UUID uuid, Time time) {
            this.enteringSetNewFencingToken.trigger();
            try {
                this.triggerSetNewFencingToken.await();
                setFencingToken(uuid);
                return CompletableFuture.completedFuture(Acknowledge.get());
            } catch (InterruptedException e) {
                throw new RuntimeException("TriggerSetNewFencingToken OneShotLatch was interrupted.");
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/AsyncCallsTest$FencedTestGateway.class */
    public interface FencedTestGateway extends FencedRpcGateway<UUID> {
        CompletableFuture<Acknowledge> setNewFencingToken(UUID uuid, @RpcTimeout Time time);
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/AsyncCallsTest$TestEndpoint.class */
    public static class TestEndpoint extends RpcEndpoint implements TestGateway {
        private final ReentrantLock lock;
        private volatile boolean concurrentAccess;

        public TestEndpoint(RpcService rpcService, ReentrantLock reentrantLock) {
            super(rpcService);
            this.lock = reentrantLock;
        }

        @Override // org.apache.flink.runtime.rpc.AsyncCallsTest.TestGateway
        public void someCall() {
            if (this.lock.tryLock()) {
                this.lock.unlock();
            } else {
                this.concurrentAccess = true;
            }
        }

        @Override // org.apache.flink.runtime.rpc.AsyncCallsTest.TestGateway
        public void anotherCall() {
            if (this.lock.tryLock()) {
                this.lock.unlock();
            } else {
                this.concurrentAccess = true;
            }
        }

        public boolean hasConcurrentAccess() {
            return this.concurrentAccess;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/AsyncCallsTest$TestGateway.class */
    public interface TestGateway extends RpcGateway {
        void someCall();

        void anotherCall();
    }

    @AfterClass
    public static void shutdown() {
        akkaRpcService.stopService();
        actorSystem.shutdown();
    }

    @Test
    public void testScheduleWithNoDelay() throws Exception {
        final ReentrantLock reentrantLock = new ReentrantLock();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, reentrantLock);
        testEndpoint.start();
        TestGateway testGateway = (TestGateway) testEndpoint.getSelfGateway(TestGateway.class);
        testGateway.someCall();
        testGateway.anotherCall();
        testGateway.someCall();
        for (int i = 0; i < 10000; i++) {
            testEndpoint.runAsync(new Runnable() { // from class: org.apache.flink.runtime.rpc.AsyncCallsTest.1
                @Override // java.lang.Runnable
                public void run() {
                    if (reentrantLock.tryLock()) {
                        reentrantLock.unlock();
                    } else {
                        atomicBoolean.set(true);
                    }
                }
            });
        }
        Assert.assertEquals("test", (String) testEndpoint.callAsync(() -> {
            if (reentrantLock.tryLock()) {
                reentrantLock.unlock();
                return "test";
            }
            atomicBoolean.set(true);
            return "test";
        }, Time.seconds(30L)).get(30L, TimeUnit.SECONDS));
        Assert.assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
        Assert.assertFalse("Rpc Endpoint had concurrent access", atomicBoolean.get());
        testEndpoint.shutDown();
    }

    @Test
    public void testScheduleWithDelay() throws Exception {
        final ReentrantLock reentrantLock = new ReentrantLock();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final OneShotLatch oneShotLatch = new OneShotLatch();
        TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, reentrantLock);
        testEndpoint.start();
        testEndpoint.runAsync(new Runnable() { // from class: org.apache.flink.runtime.rpc.AsyncCallsTest.2
            @Override // java.lang.Runnable
            public void run() {
                if (reentrantLock.tryLock()) {
                    reentrantLock.unlock();
                } else {
                    atomicBoolean.set(true);
                }
            }
        });
        long nanoTime = System.nanoTime();
        testEndpoint.scheduleRunAsync(new Runnable() { // from class: org.apache.flink.runtime.rpc.AsyncCallsTest.3
            @Override // java.lang.Runnable
            public void run() {
                if (reentrantLock.tryLock()) {
                    reentrantLock.unlock();
                } else {
                    atomicBoolean.set(true);
                }
                oneShotLatch.trigger();
            }
        }, 100L, TimeUnit.MILLISECONDS);
        oneShotLatch.await();
        long nanoTime2 = System.nanoTime();
        Assert.assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
        Assert.assertFalse("Rpc Endpoint had concurrent access", atomicBoolean.get());
        Assert.assertTrue("call was not properly delayed", (nanoTime2 - nanoTime) / 1000000 >= 100);
    }

    @Test
    public void testRunAsyncWithFencing() throws Exception {
        Time milliseconds = Time.milliseconds(100L);
        UUID randomUUID = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        testRunAsync(fencedTestEndpoint -> {
            fencedTestEndpoint.runAsync(() -> {
                completableFuture.complete(fencedTestEndpoint.getFencingToken());
            });
            return completableFuture;
        }, randomUUID);
        try {
            completableFuture.get(milliseconds.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.fail("The async run operation should not complete since it is filtered out due to the changed fencing token.");
        } catch (TimeoutException e) {
        }
    }

    @Test
    public void testRunAsyncWithoutFencing() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        UUID randomUUID = UUID.randomUUID();
        testRunAsync(fencedTestEndpoint -> {
            fencedTestEndpoint.runAsyncWithoutFencing(() -> {
                completableFuture.complete(fencedTestEndpoint.getFencingToken());
            });
            return completableFuture;
        }, randomUUID);
        Assert.assertEquals(randomUUID, completableFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
    }

    @Test
    public void testCallAsyncWithFencing() throws Exception {
        try {
            testRunAsync(fencedTestEndpoint -> {
                return fencedTestEndpoint.callAsync(() -> {
                    return true;
                }, timeout);
            }, UUID.randomUUID()).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.fail("The async call operation should fail due to the changed fencing token.");
        } catch (ExecutionException e) {
            Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
        }
    }

    @Test
    public void testCallAsyncWithoutFencing() throws Exception {
        Assert.assertTrue(((Boolean) testRunAsync(fencedTestEndpoint -> {
            return fencedTestEndpoint.callAsyncWithoutFencing(() -> {
                return true;
            }, timeout);
        }, UUID.randomUUID()).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)).booleanValue());
    }

    private static <T> CompletableFuture<T> testRunAsync(Function<FencedTestEndpoint, CompletableFuture<T>> function, UUID uuid) throws Exception {
        UUID randomUUID = UUID.randomUUID();
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        FencedTestEndpoint fencedTestEndpoint = new FencedTestEndpoint(akkaRpcService, randomUUID, oneShotLatch, oneShotLatch2);
        FencedTestGateway selfGateway = fencedTestEndpoint.getSelfGateway(FencedTestGateway.class);
        try {
            fencedTestEndpoint.start();
            CompletableFuture<Acknowledge> newFencingToken = selfGateway.setNewFencingToken(uuid, timeout);
            Assert.assertFalse(newFencingToken.isDone());
            Assert.assertEquals(randomUUID, fencedTestEndpoint.getFencingToken());
            CompletableFuture<T> apply = function.apply(fencedTestEndpoint);
            oneShotLatch.await();
            oneShotLatch2.trigger();
            newFencingToken.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            fencedTestEndpoint.shutDown();
            fencedTestEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            return apply;
        } catch (Throwable th) {
            fencedTestEndpoint.shutDown();
            fencedTestEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            throw th;
        }
    }
}
