package org.apache.flink.runtime.rpc;

import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/rpc/TestingRpcService.class */
public class TestingRpcService extends AkkaRpcService {
    private final ConcurrentHashMap<String, RpcGateway> registeredConnections;

    public TestingRpcService() {
        this(new Configuration());
    }

    public TestingRpcService(Configuration configuration) {
        super(AkkaUtils.createLocalActorSystem(configuration), AkkaRpcServiceConfiguration.fromConfiguration(configuration));
        this.registeredConnections = new ConcurrentHashMap<>();
    }

    public CompletableFuture<Void> stopService() {
        CompletableFuture<Void> stopService = super.stopService();
        stopService.whenComplete((r3, th) -> {
            this.registeredConnections.clear();
        });
        return stopService;
    }

    public void registerGateway(String str, RpcGateway rpcGateway) {
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(rpcGateway);
        if (this.registeredConnections.putIfAbsent(str, rpcGateway) != null) {
            throw new IllegalStateException("a gateway is already registered under " + str);
        }
    }

    public <C extends RpcGateway> CompletableFuture<C> connect(String str, Class<C> cls) {
        RpcGateway rpcGateway = this.registeredConnections.get(str);
        return rpcGateway != null ? cls.isAssignableFrom(rpcGateway.getClass()) ? CompletableFuture.completedFuture(rpcGateway) : FutureUtils.completedExceptionally(new Exception("Gateway registered under " + str + " is not of type " + cls)) : super.connect(str, cls);
    }

    public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String str, F f, Class<C> cls) {
        FencedRpcGateway fencedRpcGateway = (RpcGateway) this.registeredConnections.get(str);
        return fencedRpcGateway != null ? cls.isAssignableFrom(fencedRpcGateway.getClass()) ? CompletableFuture.completedFuture(fencedRpcGateway) : FutureUtils.completedExceptionally(new Exception("Gateway registered under " + str + " is not of type " + cls)) : super.connect(str, f, cls);
    }

    public void clearGateways() {
        this.registeredConnections.clear();
    }
}
