package org.apache.flink.runtime.rpc.pekko;

import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import org.agrona.concurrent.BackoffIdleStrategy;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.concurrent.ClassLoadingUtils;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.Local;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcGatewayUtils;
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.pattern.AskTimeoutException;
import org.apache.pekko.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/rpc/pekko/PekkoInvocationHandler.class */
public class PekkoInvocationHandler implements InvocationHandler, PekkoBasedEndpoint, RpcServer {
    private static final Logger LOG = LoggerFactory.getLogger(PekkoInvocationHandler.class);
    private final String address;
    private final String hostname;
    private final ActorRef rpcEndpoint;
    private final ClassLoader flinkClassLoader;
    protected final boolean isLocal;
    protected final boolean forceRpcInvocationSerialization;
    private final Duration timeout;
    private final long maximumFramesize;

    @Nullable
    private final CompletableFuture<Void> terminationFuture;
    private final boolean captureAskCallStack;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PekkoInvocationHandler(String str, String str2, ActorRef actorRef, Duration duration, long j, boolean z, @Nullable CompletableFuture<Void> completableFuture, boolean z2, ClassLoader classLoader) {
        this.address = (String) Preconditions.checkNotNull(str);
        this.hostname = (String) Preconditions.checkNotNull(str2);
        this.rpcEndpoint = (ActorRef) Preconditions.checkNotNull(actorRef);
        this.flinkClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
        this.timeout = (Duration) Preconditions.checkNotNull(duration);
        this.maximumFramesize = j;
        this.forceRpcInvocationSerialization = z;
        this.terminationFuture = completableFuture;
        this.captureAskCallStack = z2;
    }

    @Override // java.lang.reflect.InvocationHandler
    public Object invoke(Object obj, Method method, Object[] objArr) throws Throwable {
        Object invoke;
        Class<?> declaringClass = method.getDeclaringClass();
        if (declaringClass.equals(PekkoBasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(RpcServer.class)) {
            invoke = method.invoke(this, objArr);
        } else {
            if (declaringClass.equals(FencedRpcGateway.class)) {
                throw new UnsupportedOperationException("InvocationHandler does not support the call FencedRpcGateway#" + method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to retrieve a properly FencedRpcGateway.");
            }
            invoke = invokeRpc(method, objArr);
        }
        return invoke;
    }

    public ActorRef getActorRef() {
        return this.rpcEndpoint;
    }

    public void runAsync(Runnable runnable) {
        scheduleRunAsync(runnable, 0L);
    }

    public void scheduleRunAsync(Runnable runnable, long j) {
        Preconditions.checkNotNull(runnable, "runnable");
        Preconditions.checkArgument(j >= 0, "delay must be zero or greater");
        if (!this.isLocal) {
            throw new RuntimeException("Trying to send a Runnable to a remote actor at " + this.rpcEndpoint.path() + ". This is not supported.");
        }
        tell(new RunAsync(runnable, j == 0 ? 0L : System.nanoTime() + (j * BackoffIdleStrategy.DEFAULT_MAX_PARK_PERIOD_NS)));
    }

    public <V> CompletableFuture<V> callAsync(Callable<V> callable, Duration duration) {
        if (this.isLocal) {
            return (CompletableFuture<V>) ask(new CallAsync(callable), duration);
        }
        throw new RuntimeException("Trying to send a Callable to a remote actor at " + this.rpcEndpoint.path() + ". This is not supported.");
    }

    public void start() {
        this.rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
    }

    public void stop() {
        this.rpcEndpoint.tell(ControlMessages.STOP, ActorRef.noSender());
    }

    private Object invokeRpc(Method method, Object[] objArr) throws Exception {
        Object obj;
        String name = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        boolean z = method.getAnnotation(Local.class) != null;
        Duration extractRpcTimeout = RpcGatewayUtils.extractRpcTimeout(method.getParameterAnnotations(), objArr, this.timeout);
        RpcInvocation createRpcInvocationMessage = createRpcInvocationMessage(method.getDeclaringClass().getSimpleName(), name, z, parameterTypes, objArr);
        Class<?> returnType = method.getReturnType();
        if (Objects.equals(returnType, Void.TYPE)) {
            tell(createRpcInvocationMessage);
            obj = null;
        } else {
            Throwable th = this.captureAskCallStack ? new Throwable() : null;
            CompletableFuture<U> thenApply = ask(createRpcInvocationMessage, extractRpcTimeout).thenApply(obj2 -> {
                return deserializeValueIfNeeded(obj2, method, this.flinkClassLoader);
            });
            CompletableFuture completableFuture = new CompletableFuture();
            thenApply.whenComplete((BiConsumer<? super U, ? super Throwable>) (obj3, th2) -> {
                if (th2 != null) {
                    completableFuture.completeExceptionally(resolveTimeoutException(ExceptionUtils.stripCompletionException(th2), th, this.address, createRpcInvocationMessage));
                } else {
                    completableFuture.complete(obj3);
                }
            });
            if (Objects.equals(returnType, CompletableFuture.class)) {
                obj = completableFuture;
            } else {
                try {
                    obj = completableFuture.get(extractRpcTimeout.toMillis(), TimeUnit.MILLISECONDS);
                } catch (ExecutionException e) {
                    throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException(e));
                }
            }
        }
        return obj;
    }

    private RpcInvocation createRpcInvocationMessage(String str, String str2, boolean z, Class<?>[] clsArr, Object[] objArr) throws IOException {
        return (!this.isLocal || (this.forceRpcInvocationSerialization && !z)) ? new RemoteRpcInvocation(str, str2, clsArr, objArr) : new LocalRpcInvocation(str, str2, clsArr, objArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void tell(Object obj) {
        this.rpcEndpoint.tell(obj, ActorRef.noSender());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<?> ask(Object obj, Duration duration) {
        return ClassLoadingUtils.guardCompletionWithContextClassLoader(ScalaFutureUtils.toJava(Patterns.ask(this.rpcEndpoint, obj, duration.toMillis())), this.flinkClassLoader);
    }

    public String getAddress() {
        return this.address;
    }

    public String getHostname() {
        return this.hostname;
    }

    public CompletableFuture<Void> getTerminationFuture() {
        return this.terminationFuture;
    }

    private static Object deserializeValueIfNeeded(Object obj, Method method, ClassLoader classLoader) {
        if (!(obj instanceof RpcSerializedValue)) {
            return obj;
        }
        try {
            return ((RpcSerializedValue) obj).deserializeValue(classLoader);
        } catch (IOException | ClassNotFoundException e) {
            throw new CompletionException((Throwable) new RpcException("Could not deserialize the serialized payload of RPC method : " + method.getName(), e));
        }
    }

    static Throwable resolveTimeoutException(Throwable th, @Nullable Throwable th2, String str, RpcInvocation rpcInvocation) {
        if (!(th instanceof AskTimeoutException)) {
            return th;
        }
        RecipientUnreachableException recipientUnreachableException = PekkoRpcServiceUtils.isRecipientTerminatedException(th) ? new RecipientUnreachableException("unknown", str, rpcInvocation.toString()) : new TimeoutException(String.format("Invocation of [%s] at recipient [%s] timed out. This is usually caused by: 1) Pekko failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase %s.", rpcInvocation, str, RpcOptions.ASK_TIMEOUT_DURATION.key()));
        recipientUnreachableException.initCause(th);
        if (th2 != null) {
            StackTraceElement[] stackTrace = th2.getStackTrace();
            recipientUnreachableException.setStackTrace((StackTraceElement[]) Arrays.copyOfRange(stackTrace, 3, stackTrace.length));
        }
        return recipientUnreachableException;
    }
}
