package org.apache.flink.runtime.rpc.pekko;

import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcActorTest;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/pekko/RemotePekkoRpcActorTest.class */
class RemotePekkoRpcActorTest {
    private static PekkoRpcService rpcService;
    private static PekkoRpcService otherRpcService;
    private static final Configuration configuration = new Configuration();

    RemotePekkoRpcActorTest() {
    }

    @BeforeAll
    static void setupClass() throws Exception {
        rpcService = PekkoRpcServiceUtils.createRemoteRpcService(configuration, "localhost", "0", (String) null, Optional.empty());
        otherRpcService = PekkoRpcServiceUtils.createRemoteRpcService(configuration, "localhost", "0", (String) null, Optional.empty());
    }

    @AfterAll
    static void teardownClass() throws InterruptedException, ExecutionException, TimeoutException {
        RpcUtils.terminateRpcService(new RpcService[]{rpcService, otherRpcService});
    }

    @Test
    void canRespondWithNullValueRemotely() throws Exception {
        PekkoRpcActorTest.NullRespondingEndpoint nullRespondingEndpoint = new PekkoRpcActorTest.NullRespondingEndpoint(rpcService);
        Throwable th = null;
        try {
            nullRespondingEndpoint.start();
            Assertions.assertThat(((PekkoRpcActorTest.NullRespondingGateway) otherRpcService.connect(nullRespondingEndpoint.getAddress(), PekkoRpcActorTest.NullRespondingGateway.class).join()).foobar().join()).isNull();
            if (nullRespondingEndpoint != null) {
                if (0 == 0) {
                    nullRespondingEndpoint.close();
                    return;
                }
                try {
                    nullRespondingEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (nullRespondingEndpoint != null) {
                if (0 != 0) {
                    try {
                        nullRespondingEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    nullRespondingEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void canRespondWithSynchronousNullValueRemotely() throws Exception {
        PekkoRpcActorTest.NullRespondingEndpoint nullRespondingEndpoint = new PekkoRpcActorTest.NullRespondingEndpoint(rpcService);
        Throwable th = null;
        try {
            nullRespondingEndpoint.start();
            Assertions.assertThat(((PekkoRpcActorTest.NullRespondingGateway) otherRpcService.connect(nullRespondingEndpoint.getAddress(), PekkoRpcActorTest.NullRespondingGateway.class).join()).synchronousFoobar()).isNull();
            if (nullRespondingEndpoint != null) {
                if (0 == 0) {
                    nullRespondingEndpoint.close();
                    return;
                }
                try {
                    nullRespondingEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (nullRespondingEndpoint != null) {
                if (0 != 0) {
                    try {
                        nullRespondingEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    nullRespondingEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void canRespondWithSerializedValueRemotely() throws Exception {
        PekkoRpcActorTest.SerializedValueRespondingEndpoint serializedValueRespondingEndpoint = new PekkoRpcActorTest.SerializedValueRespondingEndpoint(rpcService);
        Throwable th = null;
        try {
            try {
                serializedValueRespondingEndpoint.start();
                PekkoRpcActorTest.SerializedValueRespondingGateway serializedValueRespondingGateway = (PekkoRpcActorTest.SerializedValueRespondingGateway) otherRpcService.connect(serializedValueRespondingEndpoint.getAddress(), PekkoRpcActorTest.SerializedValueRespondingGateway.class).join();
                Assertions.assertThat(serializedValueRespondingGateway.getSerializedValueSynchronously()).isEqualTo(PekkoRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE);
                Assertions.assertThat(serializedValueRespondingGateway.getSerializedValue().get()).isEqualTo(PekkoRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE);
                if (serializedValueRespondingEndpoint != null) {
                    if (0 == 0) {
                        serializedValueRespondingEndpoint.close();
                        return;
                    }
                    try {
                        serializedValueRespondingEndpoint.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (serializedValueRespondingEndpoint != null) {
                if (th != null) {
                    try {
                        serializedValueRespondingEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    serializedValueRespondingEndpoint.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void failsRpcResultImmediatelyIfEndpointIsStopped() throws Exception {
        PekkoRpcActorTest.SerializedValueRespondingEndpoint serializedValueRespondingEndpoint = new PekkoRpcActorTest.SerializedValueRespondingEndpoint(rpcService);
        Throwable th = null;
        try {
            serializedValueRespondingEndpoint.start();
            PekkoRpcActorTest.SerializedValueRespondingGateway serializedValueRespondingGateway = (PekkoRpcActorTest.SerializedValueRespondingGateway) otherRpcService.connect(serializedValueRespondingEndpoint.getAddress(), PekkoRpcActorTest.SerializedValueRespondingGateway.class).join();
            serializedValueRespondingEndpoint.close();
            Assertions.assertThatThrownBy(() -> {
                serializedValueRespondingGateway.getSerializedValue().join();
            }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RecipientUnreachableException.class)});
            if (serializedValueRespondingEndpoint != null) {
                if (0 == 0) {
                    serializedValueRespondingEndpoint.close();
                    return;
                }
                try {
                    serializedValueRespondingEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (serializedValueRespondingEndpoint != null) {
                if (0 != 0) {
                    try {
                        serializedValueRespondingEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    serializedValueRespondingEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable() throws Exception {
        RpcService createRemoteRpcService = PekkoRpcServiceUtils.createRemoteRpcService(configuration, "localhost", "0", (String) null, Optional.empty());
        try {
            PekkoRpcActorTest.SerializedValueRespondingEndpoint serializedValueRespondingEndpoint = new PekkoRpcActorTest.SerializedValueRespondingEndpoint(createRemoteRpcService);
            Throwable th = null;
            try {
                try {
                    serializedValueRespondingEndpoint.start();
                    PekkoRpcActorTest.SerializedValueRespondingGateway serializedValueRespondingGateway = (PekkoRpcActorTest.SerializedValueRespondingGateway) otherRpcService.connect(serializedValueRespondingEndpoint.getAddress(), PekkoRpcActorTest.SerializedValueRespondingGateway.class).join();
                    createRemoteRpcService.closeAsync().join();
                    Assertions.assertThatThrownBy(() -> {
                        serializedValueRespondingGateway.getSerializedValue().join();
                    }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RecipientUnreachableException.class)});
                    if (serializedValueRespondingEndpoint != null) {
                        if (0 != 0) {
                            try {
                                serializedValueRespondingEndpoint.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            serializedValueRespondingEndpoint.close();
                        }
                    }
                    RpcUtils.terminateRpcService(new RpcService[]{createRemoteRpcService});
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            RpcUtils.terminateRpcService(new RpcService[]{createRemoteRpcService});
            throw th3;
        }
    }
}
