package org.apache.flink.runtime.rpc.pekko;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.pekko.actor.ActorSystem;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/pekko/TimeoutCallStackTest.class */
class TimeoutCallStackTest {
    private static ActorSystem actorSystem;
    private static RpcService rpcService;
    private final List<RpcEndpoint> endpointsToStop = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/pekko/TimeoutCallStackTest$TestingGateway.class */
    public interface TestingGateway extends RpcGateway {
        CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time time);

        CompletableFuture<Void> callThatTimesOut(@RpcTimeout Duration duration);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/pekko/TimeoutCallStackTest$TestingRpcEndpoint.class */
    public static final class TestingRpcEndpoint extends RpcEndpoint implements TestingGateway {
        TestingRpcEndpoint(RpcService rpcService, String str) {
            super(rpcService, str);
        }

        @Override // org.apache.flink.runtime.rpc.pekko.TimeoutCallStackTest.TestingGateway
        public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time time) {
            return new CompletableFuture<>();
        }

        @Override // org.apache.flink.runtime.rpc.pekko.TimeoutCallStackTest.TestingGateway
        public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Duration duration) {
            return new CompletableFuture<>();
        }
    }

    TimeoutCallStackTest() {
    }

    @BeforeAll
    static void setup() {
        actorSystem = PekkoUtils.createDefaultActorSystem();
        rpcService = new PekkoRpcService(actorSystem, PekkoRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterAll
    static void teardown() throws Exception {
        FutureUtils.waitForAll(Arrays.asList(rpcService.closeAsync(), ScalaFutureUtils.toJava(actorSystem.terminate()))).get(10000L, TimeUnit.MILLISECONDS);
    }

    @AfterEach
    void stopTestEndpoints() {
        this.endpointsToStop.forEach((v0) -> {
            IOUtils.closeQuietly(v0);
        });
    }

    @Test
    void testTimeoutExceptionWithTime() throws Exception {
        testTimeoutException(testingGateway -> {
            return testingGateway.callThatTimesOut(Time.milliseconds(1L));
        });
    }

    @Test
    void testTimeoutExceptionWithDuration() throws Exception {
        testTimeoutException(testingGateway -> {
            return testingGateway.callThatTimesOut(Duration.ofMillis(1L));
        });
    }

    private void testTimeoutException(Function<TestingGateway, CompletableFuture<Void>> function) throws Exception {
        CompletableFuture<Void> apply = function.apply(createTestingGateway());
        apply.getClass();
        Assertions.assertThatThrownBy(apply::get).hasCauseInstanceOf(TimeoutException.class).hasStackTraceContaining("testTimeoutException").extracting((v0) -> {
            return v0.getCause();
        }).extracting((v0) -> {
            return v0.getMessage();
        }).satisfies(new ThrowingConsumer[]{str -> {
            Assertions.assertThat(str).contains(new CharSequence[]{"callThatTimesOut"});
        }});
    }

    private TestingGateway createTestingGateway() throws Exception {
        TestingRpcEndpoint testingRpcEndpoint = new TestingRpcEndpoint(rpcService, "test_name");
        this.endpointsToStop.add(testingRpcEndpoint);
        testingRpcEndpoint.start();
        return (TestingGateway) rpcService.connect(testingRpcEndpoint.getAddress(), TestingGateway.class).get();
    }
}
