package org.apache.flink.runtime.rpc.pekko;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.net.URL;
import java.net.URLClassLoader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.ClassLoadingUtils;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.Local;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.pekko.actor.ActorSystem;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/pekko/ContextClassLoadingSettingTest.class */
class ContextClassLoadingSettingTest {
    private static final ClassLoader testClassLoader = new URLClassLoader(new URL[0], ContextClassLoadingSettingTest.class.getClassLoader());
    private ClassLoader pretendFlinkClassLoader;
    private ActorSystem actorSystem;
    private PekkoRpcService pekkoRpcService;

    /* loaded from: input_file:org/apache/flink/runtime/rpc/pekko/ContextClassLoadingSettingTest$PickyObject.class */
    private static class PickyObject implements Serializable {
        static Consumer<ClassLoader> classLoaderAssertion = null;

        private PickyObject() {
        }

        private void readObject(ObjectInputStream objectInputStream) throws ClassNotFoundException, IOException {
            classLoaderAssertion.accept(Thread.currentThread().getContextClassLoader());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/pekko/ContextClassLoadingSettingTest$TestEndpoint.class */
    private static class TestEndpoint extends RpcEndpoint implements TestEndpointGateway {
        private final CompletableFuture<ClassLoader> onStartClassLoader;
        private final CompletableFuture<ClassLoader> onStopClassLoader;
        private final CompletableFuture<ClassLoader> voidOperationClassLoader;
        private final CompletableFuture<Void> rpcResponseFuture;

        @Nullable
        private final PickyObject pickyObject;

        protected TestEndpoint(RpcService rpcService) {
            this(rpcService, null);
        }

        protected TestEndpoint(RpcService rpcService, @Nullable PickyObject pickyObject) {
            super(rpcService);
            this.onStartClassLoader = new CompletableFuture<>();
            this.onStopClassLoader = new CompletableFuture<>();
            this.voidOperationClassLoader = new CompletableFuture<>();
            this.rpcResponseFuture = new CompletableFuture<>();
            this.pickyObject = pickyObject;
        }

        protected void onStart() throws Exception {
            this.onStartClassLoader.complete(Thread.currentThread().getContextClassLoader());
            super.onStart();
        }

        protected CompletableFuture<Void> onStop() {
            this.onStopClassLoader.complete(Thread.currentThread().getContextClassLoader());
            return CompletableFuture.completedFuture(null);
        }

        @Override // org.apache.flink.runtime.rpc.pekko.ContextClassLoadingSettingTest.TestEndpointGateway
        public CompletableFuture<Void> doSomethingAsync() {
            return this.rpcResponseFuture;
        }

        public CompletableFuture<ClassLoader> doCallAsync() {
            return callAsync(() -> {
                return Thread.currentThread().getContextClassLoader();
            }, Duration.ofSeconds(10L));
        }

        public CompletableFuture<ClassLoader> doRunAsync() {
            CompletableFuture<ClassLoader> completableFuture = new CompletableFuture<>();
            runAsync(() -> {
                completableFuture.complete(Thread.currentThread().getContextClassLoader());
            });
            return completableFuture;
        }

        @Override // org.apache.flink.runtime.rpc.pekko.ContextClassLoadingSettingTest.TestEndpointGateway
        public void doSomethingWithoutReturningAnything() {
            this.voidOperationClassLoader.complete(Thread.currentThread().getContextClassLoader());
        }

        @Override // org.apache.flink.runtime.rpc.pekko.ContextClassLoadingSettingTest.TestEndpointGateway
        public CompletableFuture<PickyObject> getPickyObject() {
            return CompletableFuture.completedFuture(this.pickyObject);
        }

        public void completeRPCFuture() {
            this.rpcResponseFuture.complete(null);
        }

        @Override // org.apache.flink.runtime.rpc.pekko.ContextClassLoadingSettingTest.TestEndpointGateway
        @Local
        public CompletableFuture<ClassLoader> getContextClassLoader() {
            return CompletableFuture.completedFuture(Thread.currentThread().getContextClassLoader());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/pekko/ContextClassLoadingSettingTest$TestEndpointGateway.class */
    public interface TestEndpointGateway extends RpcGateway {
        CompletableFuture<ClassLoader> getContextClassLoader();

        CompletableFuture<Void> doSomethingAsync();

        void doSomethingWithoutReturningAnything();

        CompletableFuture<PickyObject> getPickyObject();
    }

    ContextClassLoadingSettingTest() {
    }

    @BeforeEach
    void setup() {
        this.pretendFlinkClassLoader = new URLClassLoader(new URL[0], ContextClassLoadingSettingTest.class.getClassLoader());
        this.actorSystem = PekkoUtils.createDefaultActorSystem();
        this.pekkoRpcService = new PekkoRpcService(this.actorSystem, PekkoRpcServiceConfiguration.defaultConfiguration(), this.pretendFlinkClassLoader);
        PickyObject.classLoaderAssertion = this::assertIsFlinkClassLoader;
    }

    @AfterEach
    void shutdown() throws InterruptedException, ExecutionException {
        FutureUtils.waitForAll(Arrays.asList(this.pekkoRpcService.closeAsync(), ScalaFutureUtils.toJava(this.actorSystem.terminate()))).get();
        this.actorSystem = null;
        this.pekkoRpcService = null;
    }

    @Test
    void testRpcService_ExecuteRunnableSetsFlinkContextClassLoader() throws ExecutionException, InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        this.pekkoRpcService.getScheduledExecutor().execute(() -> {
            completableFuture.complete(Thread.currentThread().getContextClassLoader());
        });
        Assertions.assertThat(completableFuture.get()).isSameAs(this.pretendFlinkClassLoader);
    }

    @Test
    void testRpcService_ScheduleCallableSetsFlinkContextClassLoader() throws ExecutionException, InterruptedException {
        Assertions.assertThat((ClassLoader) this.pekkoRpcService.getScheduledExecutor().schedule(() -> {
            return Thread.currentThread().getContextClassLoader();
        }, 0L, TimeUnit.MILLISECONDS).get()).isSameAs(this.pretendFlinkClassLoader);
    }

    @Test
    void testRpcService_ScheduleRunnableSetsFlinkContextClassLoader() throws ExecutionException, InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        this.pekkoRpcService.getScheduledExecutor().schedule(() -> {
            return Boolean.valueOf(completableFuture.complete(Thread.currentThread().getContextClassLoader()));
        }, 5L, TimeUnit.MILLISECONDS);
        Assertions.assertThat(completableFuture.get()).isSameAs(this.pretendFlinkClassLoader);
    }

    @Test
    void testRpcService_ScheduleRunnableWithFixedRateSetsFlinkContextClassLoader() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList(2);
        CompletableFuture completableFuture = new CompletableFuture();
        this.pekkoRpcService.getScheduledExecutor().scheduleAtFixedRate(() -> {
            if (arrayList.size() < 2) {
                arrayList.add(Thread.currentThread().getContextClassLoader());
            } else {
                completableFuture.complete(null);
                throw new RuntimeException("cancel task");
            }
        }, 0L, 1L, TimeUnit.MILLISECONDS);
        completableFuture.get();
        org.junit.jupiter.api.Assertions.assertEquals(2, arrayList.size());
        Assertions.assertThat(arrayList).allSatisfy(classLoader -> {
            Assertions.assertThat(classLoader).isSameAs(this.pretendFlinkClassLoader);
        });
    }

    @Test
    void testRpcService_ScheduleRunnableWithFixedDelaySetsFlinkContextClassLoader() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList(2);
        CompletableFuture completableFuture = new CompletableFuture();
        this.pekkoRpcService.getScheduledExecutor().scheduleWithFixedDelay(() -> {
            if (arrayList.size() < 2) {
                arrayList.add(Thread.currentThread().getContextClassLoader());
            } else {
                completableFuture.complete(null);
                throw new RuntimeException("cancel task");
            }
        }, 0L, 1L, TimeUnit.MILLISECONDS);
        completableFuture.get();
        org.junit.jupiter.api.Assertions.assertEquals(2, arrayList.size());
        Assertions.assertThat(arrayList).allSatisfy(classLoader -> {
            Assertions.assertThat(classLoader).isSameAs(this.pretendFlinkClassLoader);
        });
    }

    @Test
    void testRpcService_ConnectFutureCompletedWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.pekkoRpcService);
        Throwable th = null;
        try {
            assertIsFlinkClassLoader((ClassLoader) ClassLoadingUtils.runWithContextClassLoader(() -> {
                return (ClassLoader) this.pekkoRpcService.connect(testEndpoint.getAddress(), TestEndpointGateway.class).thenApply(testEndpointGateway -> {
                    return Thread.currentThread().getContextClassLoader();
                }).get();
            }, testClassLoader));
            if (testEndpoint != null) {
                if (0 == 0) {
                    testEndpoint.close();
                    return;
                }
                try {
                    testEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testEndpoint != null) {
                if (0 != 0) {
                    try {
                        testEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testRpcService_TerminationFutureCompletedWithFlinkContextClassLoader() throws Exception {
        assertIsFlinkClassLoader((ClassLoader) ClassLoadingUtils.runWithContextClassLoader(() -> {
            return (ClassLoader) this.pekkoRpcService.closeAsync().thenApply(r2 -> {
                return Thread.currentThread().getContextClassLoader();
            }).get();
        }, testClassLoader));
    }

    @Test
    void testRpcActor_OnStartCalledWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.pekkoRpcService);
        Throwable th = null;
        try {
            testEndpoint.start();
            assertIsFlinkClassLoader((ClassLoader) testEndpoint.onStartClassLoader.get());
            if (testEndpoint != null) {
                if (0 == 0) {
                    testEndpoint.close();
                    return;
                }
                try {
                    testEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testEndpoint != null) {
                if (0 != 0) {
                    try {
                        testEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testRpcActor_OnStopCalledWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.pekkoRpcService);
        testEndpoint.start();
        testEndpoint.close();
        assertIsFlinkClassLoader((ClassLoader) testEndpoint.onStopClassLoader.get());
    }

    @Test
    void testRpcActor_CallAsyncCalledWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.pekkoRpcService);
        Throwable th = null;
        try {
            testEndpoint.start();
            assertIsFlinkClassLoader(testEndpoint.doCallAsync().get());
            if (testEndpoint != null) {
                if (0 == 0) {
                    testEndpoint.close();
                    return;
                }
                try {
                    testEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testEndpoint != null) {
                if (0 != 0) {
                    try {
                        testEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testRpcActor_RunAsyncCalledWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.pekkoRpcService);
        Throwable th = null;
        try {
            testEndpoint.start();
            assertIsFlinkClassLoader(testEndpoint.doRunAsync().get());
            if (testEndpoint != null) {
                if (0 == 0) {
                    testEndpoint.close();
                    return;
                }
                try {
                    testEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testEndpoint != null) {
                if (0 != 0) {
                    try {
                        testEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testRpcActor_RPCReturningVoidCalledWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.pekkoRpcService);
        Throwable th = null;
        try {
            testEndpoint.start();
            ((TestEndpointGateway) this.pekkoRpcService.connect(testEndpoint.getAddress(), TestEndpointGateway.class).get()).doSomethingWithoutReturningAnything();
            assertIsFlinkClassLoader((ClassLoader) testEndpoint.voidOperationClassLoader.get());
            if (testEndpoint != null) {
                if (0 == 0) {
                    testEndpoint.close();
                    return;
                }
                try {
                    testEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testEndpoint != null) {
                if (0 != 0) {
                    try {
                        testEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testRpcActor_RPCCalledWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.pekkoRpcService);
        Throwable th = null;
        try {
            testEndpoint.start();
            assertIsFlinkClassLoader(((TestEndpointGateway) this.pekkoRpcService.connect(testEndpoint.getAddress(), TestEndpointGateway.class).get()).getContextClassLoader().get());
            if (testEndpoint != null) {
                if (0 == 0) {
                    testEndpoint.close();
                    return;
                }
                try {
                    testEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testEndpoint != null) {
                if (0 != 0) {
                    try {
                        testEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testRpcInvocationHandler_RPCFutureCompletedWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.pekkoRpcService);
        Throwable th = null;
        try {
            testEndpoint.start();
            TestEndpointGateway testEndpointGateway = (TestEndpointGateway) this.pekkoRpcService.connect(testEndpoint.getAddress(), TestEndpointGateway.class).get();
            CompletableFuture completableFuture = (CompletableFuture) ClassLoadingUtils.runWithContextClassLoader(() -> {
                return testEndpointGateway.doSomethingAsync().thenApply(r2 -> {
                    return Thread.currentThread().getContextClassLoader();
                });
            }, testClassLoader);
            testEndpoint.completeRPCFuture();
            assertIsFlinkClassLoader((ClassLoader) completableFuture.get());
            if (testEndpoint != null) {
                if (0 == 0) {
                    testEndpoint.close();
                    return;
                }
                try {
                    testEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testEndpoint != null) {
                if (0 != 0) {
                    try {
                        testEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testRpcInvocationHandler_ContextClassLoaderUsedForDeserialization() throws Exception {
        RpcService pekkoRpcService = new PekkoRpcService(PekkoUtils.createActorSystem("serverActorSystem", PekkoUtils.getConfig(new Configuration(), new HostAndPort("localhost", 0))), PekkoRpcServiceConfiguration.defaultConfiguration());
        RpcService pekkoRpcService2 = new PekkoRpcService(PekkoUtils.createActorSystem("clientActorSystem", PekkoUtils.getConfig(new Configuration(), new HostAndPort("localhost", 0))), PekkoRpcServiceConfiguration.defaultConfiguration(), this.pretendFlinkClassLoader);
        try {
            TestEndpoint testEndpoint = new TestEndpoint(pekkoRpcService, new PickyObject());
            testEndpoint.start();
            ((TestEndpointGateway) pekkoRpcService2.connect(((TestEndpointGateway) testEndpoint.getSelfGateway(TestEndpointGateway.class)).getAddress(), TestEndpointGateway.class).get()).getPickyObject().get();
            RpcUtils.terminateRpcService(new RpcService[]{pekkoRpcService2});
            RpcUtils.terminateRpcService(new RpcService[]{pekkoRpcService});
        } catch (Throwable th) {
            RpcUtils.terminateRpcService(new RpcService[]{pekkoRpcService2});
            RpcUtils.terminateRpcService(new RpcService[]{pekkoRpcService});
            throw th;
        }
    }

    @Test
    void testSupervisorActor_TerminationFutureCompletedWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint(this.pekkoRpcService);
        testEndpoint.start();
        assertIsFlinkClassLoader((ClassLoader) ClassLoadingUtils.runWithContextClassLoader(() -> {
            return (ClassLoader) testEndpoint.closeAsync().thenApply(r2 -> {
                return Thread.currentThread().getContextClassLoader();
            }).get();
        }, testClassLoader));
    }

    private void assertIsFlinkClassLoader(ClassLoader classLoader) {
        Assertions.assertThat(classLoader).satisfiesAnyOf(new ThrowingConsumer[]{classLoader2 -> {
            Assertions.assertThat(classLoader2).isSameAs(this.pretendFlinkClassLoader);
        }, classLoader3 -> {
            Assertions.assertThat(classLoader3).isSameAs(testClassLoader);
        }});
    }
}
