package org.apache.flink.runtime.rpc;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/FencedRpcEndpointTest.class */
public class FencedRpcEndpointTest extends TestLogger {
    private static final Time timeout = Time.seconds(10);
    private static RpcService rpcService;

    /* loaded from: input_file:org/apache/flink/runtime/rpc/FencedRpcEndpointTest$FencedTestingEndpoint.class */
    private static class FencedTestingEndpoint extends FencedRpcEndpoint<UUID> implements FencedTestingGateway {
        private final OneShotLatch computationLatch;
        private final String value;

        protected FencedTestingEndpoint(RpcService rpcService, String str) {
            this(rpcService, str, null);
        }

        protected FencedTestingEndpoint(RpcService rpcService, String str, UUID uuid) {
            super(rpcService);
            this.computationLatch = new OneShotLatch();
            this.value = str;
            this.currentMainThread.set(Thread.currentThread());
            try {
                setFencingToken(uuid);
                this.currentMainThread.set(null);
            } catch (Throwable th) {
                this.currentMainThread.set(null);
                throw th;
            }
        }

        @Override // org.apache.flink.runtime.rpc.FencedRpcEndpointTest.FencedTestingGateway
        public CompletableFuture<String> foobar(Time time) {
            return CompletableFuture.completedFuture(this.value);
        }

        @Override // org.apache.flink.runtime.rpc.FencedRpcEndpointTest.FencedTestingGateway
        public CompletableFuture<Acknowledge> triggerMainThreadExecutorComputation(Time time) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    this.computationLatch.await();
                    return this.value;
                } catch (InterruptedException e) {
                    throw new CompletionException((Throwable) new FlinkException("Waiting on latch failed.", e));
                }
            }, getRpcService().getExecutor()).thenApplyAsync(str -> {
                return Acknowledge.get();
            }, (Executor) getMainThreadExecutor());
        }

        @Override // org.apache.flink.runtime.rpc.FencedRpcEndpointTest.FencedTestingGateway
        public CompletableFuture<Acknowledge> triggerComputationLatch(Time time) {
            this.computationLatch.trigger();
            return CompletableFuture.completedFuture(Acknowledge.get());
        }

        public CompletableFuture<Acknowledge> setFencingTokenInMainThread(UUID uuid, Time time) {
            return callAsyncWithoutFencing(() -> {
                setFencingToken(uuid);
                return Acknowledge.get();
            }, time);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/FencedRpcEndpointTest$FencedTestingGateway.class */
    public interface FencedTestingGateway extends FencedRpcGateway<UUID> {
        CompletableFuture<String> foobar(@RpcTimeout Time time);

        CompletableFuture<Acknowledge> triggerMainThreadExecutorComputation(@RpcTimeout Time time);

        CompletableFuture<Acknowledge> triggerComputationLatch(@RpcTimeout Time time);
    }

    @BeforeClass
    public static void setup() {
        rpcService = new TestingRpcService();
    }

    @AfterClass
    public static void teardown() throws ExecutionException, InterruptedException, TimeoutException {
        if (rpcService != null) {
            RpcUtils.terminateRpcService(rpcService, timeout);
        }
    }

    @Test
    public void testFencingTokenSetting() throws Exception {
        FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, "foobar");
        FencedTestingGateway fencedTestingGateway = (FencedTestingGateway) fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
        try {
            fencedTestingEndpoint.start();
            Assert.assertNull(fencedTestingGateway.getFencingToken());
            Assert.assertNull(fencedTestingEndpoint.getFencingToken());
            UUID randomUUID = UUID.randomUUID();
            boolean z = false;
            try {
                fencedTestingEndpoint.setFencingToken(randomUUID);
                z = true;
            } catch (AssertionError e) {
            }
            Assert.assertFalse("Setting fencing token from outside the main thread did not fail as expected.", z);
            Assert.assertNull(fencedTestingEndpoint.getFencingToken());
            fencedTestingEndpoint.setFencingTokenInMainThread(randomUUID, timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertEquals(randomUUID, fencedTestingGateway.getFencingToken());
            Assert.assertEquals(randomUUID, fencedTestingEndpoint.getFencingToken());
            RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
            throw th;
        }
    }

    @Test
    public void testFencing() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, "barfoo", randomUUID);
        try {
            fencedTestingEndpoint.start();
            FencedTestingGateway fencedTestingGateway = (FencedTestingGateway) rpcService.connect(fencedTestingEndpoint.getAddress(), randomUUID, FencedTestingGateway.class).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            FencedTestingGateway fencedTestingGateway2 = (FencedTestingGateway) rpcService.connect(fencedTestingEndpoint.getAddress(), randomUUID2, FencedTestingGateway.class).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertEquals("barfoo", fencedTestingGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
            try {
                fencedTestingGateway2.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                Assert.fail("This should fail since we have the wrong fencing token.");
            } catch (ExecutionException e) {
                Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
            }
            fencedTestingEndpoint.setFencingTokenInMainThread(UUID.randomUUID(), timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                fencedTestingGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                Assert.fail("This should fail since we have the wrong fencing token by now.");
            } catch (ExecutionException e2) {
                Assert.assertTrue(ExceptionUtils.stripExecutionException(e2) instanceof FencingTokenException);
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
        }
    }

    @Test
    public void testRemoteAndSelfGateways() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, "foobar", randomUUID);
        try {
            fencedTestingEndpoint.start();
            FencedTestingGateway selfGateway = fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
            FencedTestingGateway fencedTestingGateway = (FencedTestingGateway) rpcService.connect(fencedTestingEndpoint.getAddress(), randomUUID, FencedTestingGateway.class).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertEquals(randomUUID, selfGateway.getFencingToken());
            Assert.assertEquals(randomUUID, fencedTestingGateway.getFencingToken());
            Assert.assertEquals("foobar", selfGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
            Assert.assertEquals("foobar", fencedTestingGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
            fencedTestingEndpoint.setFencingTokenInMainThread(randomUUID2, timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertEquals(randomUUID2, selfGateway.getFencingToken());
            Assert.assertNotEquals(randomUUID2, fencedTestingGateway.getFencingToken());
            Assert.assertEquals("foobar", selfGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
            try {
                fencedTestingGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                Assert.fail("This should have failed because we don't have the right fencing token.");
            } catch (ExecutionException e) {
                Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
        }
    }

    @Test
    public void testMainThreadExecutorUnderChangingFencingToken() throws Exception {
        Time milliseconds = Time.milliseconds(100L);
        FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, "foobar", UUID.randomUUID());
        try {
            fencedTestingEndpoint.start();
            FencedTestingGateway selfGateway = fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
            CompletableFuture<Acknowledge> triggerMainThreadExecutorComputation = selfGateway.triggerMainThreadExecutorComputation(timeout);
            fencedTestingEndpoint.setFencingTokenInMainThread(UUID.randomUUID(), timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            selfGateway.triggerComputationLatch(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                triggerMainThreadExecutorComputation.get(milliseconds.toMilliseconds(), TimeUnit.MILLISECONDS);
                Assert.fail("The MainThreadExecutor computation should be able to complete because it was filtered out leading to a timeout exception.");
            } catch (TimeoutException e) {
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
        }
    }

    @Test
    public void testUnfencedRemoteGateway() throws Exception {
        FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, "foobar", UUID.randomUUID());
        try {
            fencedTestingEndpoint.start();
            FencedTestingGateway fencedTestingGateway = (FencedTestingGateway) rpcService.connect(fencedTestingEndpoint.getAddress(), FencedTestingGateway.class).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                fencedTestingGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                Assert.fail("This should have failed because we have an unfenced gateway.");
            } catch (ExecutionException e) {
                Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof RpcException);
            }
            try {
                fencedTestingGateway.getFencingToken();
                Assert.fail("We should not be able to call getFencingToken on an unfenced gateway.");
            } catch (UnsupportedOperationException e2) {
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint, timeout);
        }
    }
}
