package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
import org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.class */
public class ContextClassLoadingSettingTest extends TestLogger {
    private static final Time TIMEOUT = Time.milliseconds(10000);
    private static final ClassLoader testClassLoader = new URLClassLoader(new URL[0], ContextClassLoadingSettingTest.class.getClassLoader());
    private ClassLoader pretendFlinkClassLoader;
    private ActorSystem actorSystem;
    private AkkaRpcService akkaRpcService;

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest$PickyObject.class */
    private static class PickyObject implements Serializable {
        static Consumer<ClassLoader> classLoaderAssertion = null;

        private PickyObject() {
        }

        private void readObject(ObjectInputStream objectInputStream) throws ClassNotFoundException, IOException {
            classLoaderAssertion.accept(Thread.currentThread().getContextClassLoader());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest$TestEndpoint.class */
    private static class TestEndpoint extends RpcEndpoint implements TestEndpointGateway {
        private final CompletableFuture<ClassLoader> onStartClassLoader;
        private final CompletableFuture<ClassLoader> onStopClassLoader;
        private final CompletableFuture<ClassLoader> voidOperationClassLoader;
        private final CompletableFuture<Void> rpcResponseFuture;

        @Nullable
        private final PickyObject pickyObject;

        protected TestEndpoint(RpcService rpcService) {
            this(rpcService, null);
        }

        protected TestEndpoint(RpcService rpcService, @Nullable PickyObject pickyObject) {
            super(rpcService);
            this.onStartClassLoader = new CompletableFuture<>();
            this.onStopClassLoader = new CompletableFuture<>();
            this.voidOperationClassLoader = new CompletableFuture<>();
            this.rpcResponseFuture = new CompletableFuture<>();
            this.pickyObject = pickyObject;
        }

        protected void onStart() throws Exception {
            this.onStartClassLoader.complete(Thread.currentThread().getContextClassLoader());
            super.onStart();
        }

        protected CompletableFuture<Void> onStop() {
            this.onStopClassLoader.complete(Thread.currentThread().getContextClassLoader());
            return CompletableFuture.completedFuture(null);
        }

        @Override // org.apache.flink.runtime.rpc.akka.ContextClassLoadingSettingTest.TestEndpointGateway
        public CompletableFuture<Void> doSomethingAsync() {
            return this.rpcResponseFuture;
        }

        @Override // org.apache.flink.runtime.rpc.akka.ContextClassLoadingSettingTest.TestEndpointGateway
        public CompletableFuture<ClassLoader> doCallAsync() {
            return callAsync(() -> {
                return Thread.currentThread().getContextClassLoader();
            }, Time.of(10L, TimeUnit.SECONDS));
        }

        @Override // org.apache.flink.runtime.rpc.akka.ContextClassLoadingSettingTest.TestEndpointGateway
        public CompletableFuture<ClassLoader> doRunAsync() {
            CompletableFuture<ClassLoader> completableFuture = new CompletableFuture<>();
            runAsync(() -> {
                completableFuture.complete(Thread.currentThread().getContextClassLoader());
            });
            return completableFuture;
        }

        @Override // org.apache.flink.runtime.rpc.akka.ContextClassLoadingSettingTest.TestEndpointGateway
        public void doSomethingWithoutReturningAnything() {
            this.voidOperationClassLoader.complete(Thread.currentThread().getContextClassLoader());
        }

        @Override // org.apache.flink.runtime.rpc.akka.ContextClassLoadingSettingTest.TestEndpointGateway
        public CompletableFuture<PickyObject> getPickyObject() {
            return CompletableFuture.completedFuture(this.pickyObject);
        }

        public void completeRPCFuture() {
            this.rpcResponseFuture.complete(null);
        }

        @Override // org.apache.flink.runtime.rpc.akka.ContextClassLoadingSettingTest.TestEndpointGateway
        public CompletableFuture<ClassLoader> getContextClassLoader() {
            return CompletableFuture.completedFuture(Thread.currentThread().getContextClassLoader());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest$TestEndpointGateway.class */
    public interface TestEndpointGateway extends RpcGateway {
        CompletableFuture<ClassLoader> getContextClassLoader();

        CompletableFuture<Void> doSomethingAsync();

        CompletableFuture<ClassLoader> doCallAsync();

        CompletableFuture<ClassLoader> doRunAsync();

        void doSomethingWithoutReturningAnything();

        CompletableFuture<PickyObject> getPickyObject();
    }

    @Before
    public void setup() {
        this.pretendFlinkClassLoader = new URLClassLoader(new URL[0], ContextClassLoadingSettingTest.class.getClassLoader());
        this.actorSystem = AkkaUtils.createDefaultActorSystem();
        this.akkaRpcService = new AkkaRpcService(this.actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration(), this.pretendFlinkClassLoader);
        PickyObject.classLoaderAssertion = this::assertIsFlinkClassLoader;
    }

    @After
    public void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
        FutureUtils.waitForAll(Arrays.asList(this.akkaRpcService.stopService(), AkkaFutureUtils.toJava(this.actorSystem.terminate()))).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        this.actorSystem = null;
        this.akkaRpcService = null;
    }

    @Test
    public void testAkkaRpcService_ExecuteRunnableSetsFlinkContextClassLoader() throws ExecutionException, InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        this.akkaRpcService.execute(() -> {
            return Boolean.valueOf(completableFuture.complete(Thread.currentThread().getContextClassLoader()));
        });
        assertIsFlinkClassLoader((ClassLoader) completableFuture.get());
    }

    @Test
    public void testAkkaRpcService_ExecuteCallableSetsFlinkContextClassLoader() throws ExecutionException, InterruptedException {
        assertIsFlinkClassLoader((ClassLoader) this.akkaRpcService.execute(() -> {
            return Thread.currentThread().getContextClassLoader();
        }).get());
    }

    @Test
    public void testAkkaRpcService_ExecuteCallableResultCompletedWithFlinkContextClassLoader() throws ExecutionException, InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = (CompletableFuture) ClassLoadingUtils.runWithContextClassLoader(() -> {
            AkkaRpcService akkaRpcService = this.akkaRpcService;
            completableFuture.getClass();
            return akkaRpcService.execute(completableFuture::get).thenApply(r2 -> {
                return Thread.currentThread().getContextClassLoader();
            });
        }, testClassLoader);
        completableFuture.complete(null);
        assertIsFlinkClassLoader((ClassLoader) completableFuture2.get());
    }

    @Test
    public void testAkkaRpcService_ScheduleSetsFlinkContextClassLoader() throws ExecutionException, InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        this.akkaRpcService.scheduleRunnable(() -> {
            completableFuture.complete(Thread.currentThread().getContextClassLoader());
        }, 5L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat(completableFuture.get(), Is.is(this.pretendFlinkClassLoader));
    }

    @Test
    public void testAkkaRpcService_ConnectFutureCompletedWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.akkaRpcService);
        Throwable th = null;
        try {
            assertIsFlinkClassLoader((ClassLoader) ClassLoadingUtils.runWithContextClassLoader(() -> {
                return (ClassLoader) this.akkaRpcService.connect(testEndpoint.getAddress(), TestEndpointGateway.class).thenApply(testEndpointGateway -> {
                    return Thread.currentThread().getContextClassLoader();
                }).get();
            }, testClassLoader));
            if (testEndpoint != null) {
                if (0 == 0) {
                    testEndpoint.close();
                    return;
                }
                try {
                    testEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testEndpoint != null) {
                if (0 != 0) {
                    try {
                        testEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAkkaRpcService_TerminationFutureCompletedWithFlinkContextClassLoader() throws Exception {
        assertIsFlinkClassLoader((ClassLoader) ClassLoadingUtils.runWithContextClassLoader(() -> {
            return (ClassLoader) this.akkaRpcService.stopService().thenApply(r2 -> {
                return Thread.currentThread().getContextClassLoader();
            }).get();
        }, testClassLoader));
    }

    @Test
    public void testAkkaRpcActor_OnStartCalledWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.akkaRpcService);
        Throwable th = null;
        try {
            testEndpoint.start();
            assertIsFlinkClassLoader((ClassLoader) testEndpoint.onStartClassLoader.get());
            if (testEndpoint != null) {
                if (0 == 0) {
                    testEndpoint.close();
                    return;
                }
                try {
                    testEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testEndpoint != null) {
                if (0 != 0) {
                    try {
                        testEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAkkaRpcActor_OnStopCalledWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.akkaRpcService);
        testEndpoint.start();
        testEndpoint.close();
        assertIsFlinkClassLoader((ClassLoader) testEndpoint.onStopClassLoader.get());
    }

    @Test
    public void testAkkaRpcActor_CallAsyncCalledWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.akkaRpcService);
        Throwable th = null;
        try {
            testEndpoint.start();
            assertIsFlinkClassLoader(testEndpoint.doCallAsync().get());
            if (testEndpoint != null) {
                if (0 == 0) {
                    testEndpoint.close();
                    return;
                }
                try {
                    testEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testEndpoint != null) {
                if (0 != 0) {
                    try {
                        testEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAkkaRpcActor_RunAsyncCalledWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.akkaRpcService);
        Throwable th = null;
        try {
            testEndpoint.start();
            assertIsFlinkClassLoader(testEndpoint.doRunAsync().get());
            if (testEndpoint != null) {
                if (0 == 0) {
                    testEndpoint.close();
                    return;
                }
                try {
                    testEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testEndpoint != null) {
                if (0 != 0) {
                    try {
                        testEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAkkaRpcActor_RPCReturningVoidCalledWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.akkaRpcService);
        Throwable th = null;
        try {
            testEndpoint.start();
            ((TestEndpointGateway) this.akkaRpcService.connect(testEndpoint.getAddress(), TestEndpointGateway.class).get()).doSomethingWithoutReturningAnything();
            assertIsFlinkClassLoader((ClassLoader) testEndpoint.voidOperationClassLoader.get());
            if (testEndpoint != null) {
                if (0 == 0) {
                    testEndpoint.close();
                    return;
                }
                try {
                    testEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testEndpoint != null) {
                if (0 != 0) {
                    try {
                        testEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAkkaRpcActor_RPCCalledWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.akkaRpcService);
        Throwable th = null;
        try {
            testEndpoint.start();
            assertIsFlinkClassLoader(((TestEndpointGateway) this.akkaRpcService.connect(testEndpoint.getAddress(), TestEndpointGateway.class).get()).getContextClassLoader().get());
            if (testEndpoint != null) {
                if (0 == 0) {
                    testEndpoint.close();
                    return;
                }
                try {
                    testEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testEndpoint != null) {
                if (0 != 0) {
                    try {
                        testEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAkkaRpcInvocationHandler_RPCFutureCompletedWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.akkaRpcService);
        Throwable th = null;
        try {
            testEndpoint.start();
            TestEndpointGateway testEndpointGateway = (TestEndpointGateway) this.akkaRpcService.connect(testEndpoint.getAddress(), TestEndpointGateway.class).get();
            CompletableFuture completableFuture = (CompletableFuture) ClassLoadingUtils.runWithContextClassLoader(() -> {
                return testEndpointGateway.doSomethingAsync().thenApply(r2 -> {
                    return Thread.currentThread().getContextClassLoader();
                });
            }, testClassLoader);
            testEndpoint.completeRPCFuture();
            assertIsFlinkClassLoader((ClassLoader) completableFuture.get());
            if (testEndpoint != null) {
                if (0 == 0) {
                    testEndpoint.close();
                    return;
                }
                try {
                    testEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testEndpoint != null) {
                if (0 != 0) {
                    try {
                        testEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAkkaRpcInvocationHandler_ContextClassLoaderUsedForDeserialization() throws Exception {
        AkkaRpcService akkaRpcService = new AkkaRpcService(AkkaUtils.createActorSystem("serverActorSystem", AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("localhost", 0))), AkkaRpcServiceConfiguration.defaultConfiguration());
        AkkaRpcService akkaRpcService2 = new AkkaRpcService(AkkaUtils.createActorSystem("clientActorSystem", AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("localhost", 0))), AkkaRpcServiceConfiguration.defaultConfiguration(), this.pretendFlinkClassLoader);
        try {
            TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, new PickyObject());
            testEndpoint.start();
            ((TestEndpointGateway) akkaRpcService2.connect(((TestEndpointGateway) testEndpoint.getSelfGateway(TestEndpointGateway.class)).getAddress(), TestEndpointGateway.class).get()).getPickyObject().get();
            RpcUtils.terminateRpcService(akkaRpcService2, TIMEOUT);
            RpcUtils.terminateRpcService(akkaRpcService, TIMEOUT);
        } catch (Throwable th) {
            RpcUtils.terminateRpcService(akkaRpcService2, TIMEOUT);
            RpcUtils.terminateRpcService(akkaRpcService, TIMEOUT);
            throw th;
        }
    }

    @Test
    public void testSupervisorActor_TerminationFutureCompletedWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.akkaRpcService);
        testEndpoint.start();
        assertIsFlinkClassLoader((ClassLoader) ClassLoadingUtils.runWithContextClassLoader(() -> {
            return (ClassLoader) testEndpoint.closeAsync().thenApply(r2 -> {
                return Thread.currentThread().getContextClassLoader();
            }).get();
        }, testClassLoader));
    }

    private void assertIsFlinkClassLoader(ClassLoader classLoader) {
        MatcherAssert.assertThat(classLoader, CoreMatchers.either(Is.is(this.pretendFlinkClassLoader)).or(Is.is(testClassLoader)));
    }
}
