package org.apache.flink.runtime.rpc;

import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.BitSet;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/rpc/TestingSerialRpcService.class */
public class TestingSerialRpcService implements RpcService {
    private final DirectExecutorService executorService = new DirectExecutorService();
    private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    private final ConcurrentHashMap<String, RpcGateway> registeredConnections = new ConcurrentHashMap<>(16);
    private final CompletableFuture<Void> terminationFuture = new FlinkCompletableFuture();
    private final ScheduledExecutor scheduledExecutorServiceAdapter = new ScheduledExecutorServiceAdapter(this.scheduledExecutorService);

    /* loaded from: input_file:org/apache/flink/runtime/rpc/TestingSerialRpcService$DoneScheduledFuture.class */
    private static class DoneScheduledFuture<V> implements ScheduledFuture<V> {
        private final V value;

        private DoneScheduledFuture(V v) {
            this.value = v;
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return 0L;
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            return 0;
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            return false;
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return false;
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return true;
        }

        @Override // java.util.concurrent.Future
        public V get() throws InterruptedException, ExecutionException {
            return this.value;
        }

        @Override // java.util.concurrent.Future
        public V get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            return this.value;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/TestingSerialRpcService$TestingSerialInvocationHandler.class */
    private static final class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutable, StartStoppable {
        private final T rpcEndpoint;
        private final Time timeout;
        private final String address;

        private TestingSerialInvocationHandler(String str, T t) {
            this(str, t, Time.seconds(10L));
        }

        private TestingSerialInvocationHandler(String str, T t, Time time) {
            this.rpcEndpoint = t;
            this.timeout = time;
            this.address = str;
        }

        @Override // java.lang.reflect.InvocationHandler
        public Object invoke(Object obj, Method method, Object[] objArr) throws Throwable {
            Class<?> declaringClass = method.getDeclaringClass();
            if (declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(RpcGateway.class)) {
                return method.invoke(this, objArr);
            }
            String name = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            Annotation[][] parameterAnnotations = method.getParameterAnnotations();
            Time extractRpcTimeout = extractRpcTimeout(parameterAnnotations, objArr, this.timeout);
            Tuple2<Class<?>[], Object[]> filterArguments = filterArguments(parameterTypes, parameterAnnotations, objArr);
            if (!method.getReturnType().equals(Future.class)) {
                return handleRpcInvocationSync(name, (Class[]) filterArguments.f0, (Object[]) filterArguments.f1, extractRpcTimeout);
            }
            try {
                return FlinkCompletableFuture.completed(handleRpcInvocationSync(name, (Class[]) filterArguments.f0, (Object[]) filterArguments.f1, extractRpcTimeout));
            } catch (Throwable th) {
                return FlinkCompletableFuture.completedExceptionally(th);
            }
        }

        private Object handleRpcInvocationSync(String str, Class<?>[] clsArr, Object[] objArr, Time time) throws Exception {
            Object invoke = lookupRpcMethod(str, clsArr).invoke(this.rpcEndpoint, objArr);
            return invoke instanceof Future ? ((Future) invoke).get(time.getSize(), time.getUnit()) : invoke;
        }

        public void runAsync(Runnable runnable) {
            runnable.run();
        }

        public <V> Future<V> callAsync(Callable<V> callable, Time time) {
            try {
                return FlinkCompletableFuture.completed(callable.call());
            } catch (Throwable th) {
                return FlinkCompletableFuture.completedExceptionally(th);
            }
        }

        public void scheduleRunAsync(Runnable runnable, long j) {
            try {
                TimeUnit.MILLISECONDS.sleep(j);
                runnable.run();
            } catch (Throwable th) {
                throw new RuntimeException(th);
            }
        }

        public String getAddress() {
            return this.address;
        }

        public String getHostname() {
            return this.address;
        }

        public void start() {
        }

        public void stop() {
        }

        private Method lookupRpcMethod(String str, Class<?>[] clsArr) throws NoSuchMethodException {
            return this.rpcEndpoint.getClass().getMethod(str, clsArr);
        }

        private static Time extractRpcTimeout(Annotation[][] annotationArr, Object[] objArr, Time time) {
            if (objArr != null) {
                Preconditions.checkArgument(annotationArr.length == objArr.length);
                for (int i = 0; i < annotationArr.length; i++) {
                    if (isRpcTimeout(annotationArr[i])) {
                        if (objArr[i] instanceof Time) {
                            return (Time) objArr[i];
                        }
                        throw new RuntimeException("The rpc timeout parameter must be of type " + Time.class.getName() + ". The type " + objArr[i].getClass().getName() + " is not supported.");
                    }
                }
            }
            return time;
        }

        private static Tuple2<Class<?>[], Object[]> filterArguments(Class<?>[] clsArr, Annotation[][] annotationArr, Object[] objArr) {
            Class<?>[] clsArr2;
            Object[] objArr2;
            if (objArr == null) {
                clsArr2 = clsArr;
                objArr2 = null;
            } else {
                Preconditions.checkArgument(clsArr.length == annotationArr.length);
                Preconditions.checkArgument(annotationArr.length == objArr.length);
                BitSet bitSet = new BitSet(clsArr.length);
                int length = clsArr.length;
                for (int i = 0; i < clsArr.length; i++) {
                    if (isRpcTimeout(annotationArr[i])) {
                        bitSet.set(i);
                        length--;
                    }
                }
                if (length == clsArr.length) {
                    clsArr2 = clsArr;
                    objArr2 = objArr;
                } else {
                    clsArr2 = new Class[length];
                    objArr2 = new Object[length];
                    int i2 = 0;
                    for (int i3 = 0; i3 < clsArr.length; i3++) {
                        if (!bitSet.get(i3)) {
                            clsArr2[i2] = clsArr[i3];
                            objArr2[i2] = objArr[i3];
                            i2++;
                        }
                    }
                }
            }
            return Tuple2.of(clsArr2, objArr2);
        }

        private static boolean isRpcTimeout(Annotation[] annotationArr) {
            for (Annotation annotation : annotationArr) {
                if (annotation.annotationType().equals(RpcTimeout.class)) {
                    return true;
                }
            }
            return false;
        }
    }

    public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long j, TimeUnit timeUnit) {
        try {
            timeUnit.sleep(j);
            runnable.run();
            return new DoneScheduledFuture(null);
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    public void execute(Runnable runnable) {
        runnable.run();
    }

    public <T> Future<T> execute(Callable<T> callable) {
        try {
            return FlinkCompletableFuture.completed(callable.call());
        } catch (Exception e) {
            return FlinkCompletableFuture.completedExceptionally(e);
        }
    }

    public Executor getExecutor() {
        return this.executorService;
    }

    public ScheduledExecutor getScheduledExecutor() {
        return this.scheduledExecutorServiceAdapter;
    }

    public void stopService() {
        this.executorService.shutdown();
        this.scheduledExecutorService.shutdown();
        boolean z = false;
        try {
            z = this.scheduledExecutorService.awaitTermination(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (!z) {
            Iterator<Runnable> it = this.scheduledExecutorService.shutdownNow().iterator();
            while (it.hasNext()) {
                it.next().run();
            }
        }
        this.registeredConnections.clear();
        this.terminationFuture.complete((Object) null);
    }

    public Future<Void> getTerminationFuture() {
        return this.terminationFuture;
    }

    public void stopServer(RpcGateway rpcGateway) {
        this.registeredConnections.remove(rpcGateway.getAddress());
    }

    public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S s) {
        C c = (C) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{s.getSelfGatewayType(), MainThreadExecutable.class, StartStoppable.class, RpcGateway.class}, new TestingSerialInvocationHandler(UUID.randomUUID().toString(), s));
        this.registeredConnections.putIfAbsent(c.getAddress(), c);
        return c;
    }

    public String getAddress() {
        return "";
    }

    public <C extends RpcGateway> Future<C> connect(String str, Class<C> cls) {
        RpcGateway rpcGateway = this.registeredConnections.get(str);
        return rpcGateway != null ? cls.isAssignableFrom(rpcGateway.getClass()) ? FlinkCompletableFuture.completed(rpcGateway) : FlinkCompletableFuture.completedExceptionally(new Exception("Gateway registered under " + str + " is not of type " + cls)) : FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name"));
    }

    public void registerGateway(String str, RpcGateway rpcGateway) {
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(rpcGateway);
        if (this.registeredConnections.putIfAbsent(str, rpcGateway) != null) {
            throw new IllegalStateException("a gateway is already registered under " + str);
        }
    }

    public void clearGateways() {
        this.registeredConnections.clear();
    }
}
