/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.rpc;

import com.google.protobuf.Empty;
import java.io.Serializable;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.geaflow.cluster.protocol.IEvent;
import org.apache.geaflow.cluster.resourcemanager.ReleaseResourceRequest;
import org.apache.geaflow.cluster.resourcemanager.ReleaseResponse;
import org.apache.geaflow.cluster.resourcemanager.RequireResourceRequest;
import org.apache.geaflow.cluster.resourcemanager.RequireResponse;
import org.apache.geaflow.cluster.rpc.RpcEndpointRef;
import org.apache.geaflow.cluster.rpc.RpcEndpointRefFactory;
import org.apache.geaflow.cluster.rpc.impl.ContainerEndpointRef;
import org.apache.geaflow.cluster.rpc.impl.DefaultRpcCallbackImpl;
import org.apache.geaflow.cluster.rpc.impl.DriverEndpointRef;
import org.apache.geaflow.cluster.rpc.impl.MasterEndpointRef;
import org.apache.geaflow.cluster.rpc.impl.MetricEndpointRef;
import org.apache.geaflow.cluster.rpc.impl.PipelineMasterEndpointRef;
import org.apache.geaflow.cluster.rpc.impl.ResourceManagerEndpointRef;
import org.apache.geaflow.cluster.rpc.impl.SupervisorEndpointRef;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
import org.apache.geaflow.common.exception.GeaflowRuntimeException;
import org.apache.geaflow.common.heartbeat.Heartbeat;
import org.apache.geaflow.common.utils.RetryCommand;
import org.apache.geaflow.common.utils.ThreadUtil;
import org.apache.geaflow.ha.service.AbstractHAService;
import org.apache.geaflow.ha.service.HAServiceFactory;
import org.apache.geaflow.ha.service.IHAService;
import org.apache.geaflow.ha.service.ResourceData;
import org.apache.geaflow.pipeline.IPipelineResult;
import org.apache.geaflow.pipeline.Pipeline;
import org.apache.geaflow.rpc.proto.Container;
import org.apache.geaflow.rpc.proto.Master;
import org.apache.geaflow.rpc.proto.Metrics;
import org.apache.geaflow.rpc.proto.Supervisor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcClient
implements Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);
    private static final int RPC_RETRY_EXTRA_MS = 30000;
    private static IHAService haService;
    private static RpcEndpointRefFactory refFactory;
    private static RpcClient INSTANCE;
    private final int retryTimes;
    private final int retryIntervalMs;
    private final ExecutorService executorService;

    private RpcClient(Configuration configuration) {
        this.retryIntervalMs = configuration.getInteger(ExecutionConfigKeys.RPC_RETRY_INTERVAL_MS);
        int heartbeatTimeoutMs = configuration.getInteger(ExecutionConfigKeys.HEARTBEAT_TIMEOUT_MS);
        int minTimes = (int)Math.ceil((double)(heartbeatTimeoutMs + 30000) / (double)this.retryIntervalMs);
        int rpcRetryTimes = configuration.getInteger(ExecutionConfigKeys.RPC_RETRY_TIMES);
        this.retryTimes = Math.max(minTimes, rpcRetryTimes);
        refFactory = RpcEndpointRefFactory.getInstance(configuration);
        haService = HAServiceFactory.getService((Configuration)configuration);
        int threads = configuration.getInteger(ExecutionConfigKeys.RPC_ASYNC_THREADS);
        this.executorService = new ThreadPoolExecutor(threads, threads, Long.MAX_VALUE, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), ThreadUtil.namedThreadFactory((boolean)true, (String)"rpc-executor"));
        LOGGER.info("RpcClient init retryTimes:{} retryIntervalMs:{} threads:{}", new Object[]{this.retryTimes, this.retryIntervalMs, threads});
    }

    public static synchronized RpcClient init(Configuration configuration) {
        if (INSTANCE == null) {
            INSTANCE = new RpcClient(configuration);
        }
        return INSTANCE;
    }

    public static synchronized RpcClient getInstance() {
        return INSTANCE;
    }

    public <T> void registerContainer(String masterId, T info, RpcEndpointRef.RpcCallback<Master.RegisterResponse> callback) {
        this.doRpcWithRetry(() -> {
            MasterEndpointRef endpointRef = this.connectMaster(masterId);
            if (endpointRef == null) {
                LOGGER.warn("Cannot register container with master {}: endpoint not available", (Object)masterId);
                return;
            }
            endpointRef.registerContainer(info, new DefaultRpcCallbackImpl<Master.RegisterResponse>(callback, masterId, haService));
        }, masterId, RpcEndpointRefFactory.EndpointType.MASTER);
    }

    public void sendHeartBeat(String masterId, Heartbeat heartbeat, RpcEndpointRef.RpcCallback<Master.HeartbeatResponse> callback) {
        this.doRpcWithRetry(() -> {
            MasterEndpointRef endpointRef = this.connectMaster(masterId);
            if (endpointRef == null) {
                LOGGER.warn("Cannot send heartbeat to master {}: endpoint not available", (Object)masterId);
                return;
            }
            endpointRef.sendHeartBeat(heartbeat, new DefaultRpcCallbackImpl<Master.HeartbeatResponse>(callback, masterId, haService));
        }, masterId, RpcEndpointRefFactory.EndpointType.MASTER);
    }

    public Empty sendException(String masterId, Integer containerId, String containerName, Throwable throwable) {
        return this.doRpcWithRetry(() -> {
            MasterEndpointRef endpointRef = this.connectMaster(masterId);
            if (endpointRef == null) {
                LOGGER.warn("Cannot send exception to master {}: endpoint not available", (Object)masterId);
                return Empty.getDefaultInstance();
            }
            return endpointRef.sendException(containerId, containerName, throwable.getMessage());
        }, masterId, RpcEndpointRefFactory.EndpointType.MASTER);
    }

    public Future processContainer(String containerId, IEvent event) {
        return this.doRpcWithRetry(() -> {
            ContainerEndpointRef endpointRef = this.connectContainer(containerId);
            if (endpointRef == null) {
                LOGGER.warn("Cannot process container event for {}: endpoint not available", (Object)containerId);
                return null;
            }
            return endpointRef.process(event, new DefaultRpcCallbackImpl<Container.Response>(null, containerId, haService));
        }, containerId, RpcEndpointRefFactory.EndpointType.CONTAINER);
    }

    public void processContainer(String containerId, IEvent event, RpcEndpointRef.RpcCallback<Container.Response> callback) {
        this.doRpcWithRetry(() -> {
            ContainerEndpointRef endpointRef = this.connectContainer(containerId);
            if (endpointRef == null) {
                LOGGER.warn("Cannot process container event for {}: endpoint not available", (Object)containerId);
                return;
            }
            endpointRef.process(event, new DefaultRpcCallbackImpl<Container.Response>(callback, containerId, haService));
        }, containerId, RpcEndpointRefFactory.EndpointType.CONTAINER);
    }

    public void processPipeline(String driverId, IEvent event) {
        this.doRpcWithRetry(() -> this.connectPipelineManager(driverId).process(event, new DefaultRpcCallbackImpl<Container.Response>()), driverId, RpcEndpointRefFactory.EndpointType.PIPELINE_MANAGER);
    }

    public IPipelineResult executePipeline(String driverId, Pipeline pipeline) {
        return this.doRpcWithRetry(() -> this.connectDriver(driverId).executePipeline(pipeline), driverId, RpcEndpointRefFactory.EndpointType.DRIVER);
    }

    public RequireResponse requireResource(String masterId, RequireResourceRequest request) {
        return this.doRpcWithRetry(() -> this.connectRM(masterId).requireResource(request), masterId, RpcEndpointRefFactory.EndpointType.RESOURCE_MANAGER);
    }

    public ReleaseResponse releaseResource(String masterId, ReleaseResourceRequest request) {
        return this.doRpcWithRetry(() -> this.connectRM(masterId).releaseResource(request), masterId, RpcEndpointRefFactory.EndpointType.RESOURCE_MANAGER);
    }

    public Future<Metrics.MetricQueryResponse> requestMetrics(String id, Metrics.MetricQueryRequest request, RpcEndpointRef.RpcCallback<Metrics.MetricQueryResponse> callback) {
        return this.doRpcWithRetry(() -> this.connectMetricServer(id).queryMetrics(request, new DefaultRpcCallbackImpl<Metrics.MetricQueryResponse>(callback, id, haService)), id, RpcEndpointRefFactory.EndpointType.METRIC);
    }

    public Future restartWorkerBySupervisor(String id, boolean fastFailure) {
        int retries = fastFailure ? 1 : this.retryTimes;
        try {
            return this.doRpcWithRetry(() -> {
                ResourceData resourceData = this.loadSupervisorData(id, fastFailure);
                return this.connectSupervisor(resourceData).restart(resourceData.getProcessId(), new DefaultRpcCallbackImpl<Empty>());
            }, id, RpcEndpointRefFactory.EndpointType.SUPERVISOR, retries);
        }
        catch (Throwable e) {
            CompletableFuture result = new CompletableFuture();
            result.completeExceptionally(e);
            return result;
        }
    }

    public Supervisor.StatusResponse queryWorkerStatusBySupervisor(String id) {
        return this.connectSupervisor(id).status();
    }

    public void closeMasterConnection(String masterId) {
        MasterEndpointRef endpointRef = this.connectMaster(masterId);
        if (endpointRef != null) {
            endpointRef.closeEndpoint();
        } else {
            LOGGER.debug("No endpoint reference found for master: {}, skipping close", (Object)masterId);
        }
    }

    public void closeDriverConnection(String driverId) {
        DriverEndpointRef endpointRef = this.connectDriver(driverId);
        if (endpointRef != null) {
            endpointRef.closeEndpoint();
        } else {
            LOGGER.debug("No endpoint reference found for driver: {}, skipping close", (Object)driverId);
        }
    }

    public void closeContainerConnection(String containerId) {
        ContainerEndpointRef endpointRef = this.connectContainer(containerId);
        if (endpointRef != null) {
            endpointRef.closeEndpoint();
        } else {
            LOGGER.debug("No endpoint reference found for container: {}, skipping close", (Object)containerId);
        }
    }

    private MasterEndpointRef connectMaster(String masterId) {
        ResourceData resourceData = this.getResourceData(masterId);
        if (resourceData == null) {
            LOGGER.warn("Resource data not found for master: {}, skipping connection", (Object)masterId);
            return null;
        }
        return refFactory.connectMaster(resourceData.getHost(), resourceData.getRpcPort());
    }

    private ResourceManagerEndpointRef connectRM(String masterId) {
        ResourceData resourceData = this.getResourceData(masterId);
        return refFactory.connectResourceManager(resourceData.getHost(), resourceData.getRpcPort());
    }

    private DriverEndpointRef connectDriver(String driverId) {
        ResourceData resourceData = this.getResourceData(driverId);
        if (resourceData == null) {
            LOGGER.warn("Resource data not found for driver: {}, skipping connection", (Object)driverId);
            return null;
        }
        return refFactory.connectDriver(resourceData.getHost(), resourceData.getRpcPort());
    }

    private ContainerEndpointRef connectContainer(String containerId) {
        ResourceData resourceData = this.getResourceData(containerId);
        if (resourceData == null) {
            LOGGER.warn("Resource data not found for container: {}, skipping connection", (Object)containerId);
            return null;
        }
        return refFactory.connectContainer(resourceData.getHost(), resourceData.getRpcPort());
    }

    private PipelineMasterEndpointRef connectPipelineManager(String id) {
        ResourceData resourceData = this.getResourceData(id);
        return refFactory.connectPipelineManager(resourceData.getHost(), resourceData.getRpcPort());
    }

    private MetricEndpointRef connectMetricServer(String id) {
        ResourceData resourceData = this.getResourceData(id);
        return refFactory.connectMetricServer(resourceData.getHost(), resourceData.getMetricPort());
    }

    private SupervisorEndpointRef connectSupervisor(String id) {
        ResourceData resourceData = this.loadSupervisorData(id, true);
        return this.connectSupervisor(resourceData);
    }

    private SupervisorEndpointRef connectSupervisor(ResourceData resourceData) {
        return refFactory.connectSupervisor(resourceData.getHost(), resourceData.getSupervisorPort());
    }

    private ResourceData loadSupervisorData(String id, boolean fastFailure) {
        ResourceData resourceData = fastFailure ? haService.loadResource(id) : ((AbstractHAService)haService).loadDataFromStore(id, true, ResourceData::getSupervisorPort);
        return resourceData;
    }

    private <T> T doRpcWithRetry(Callable<T> function, String resourceId, RpcEndpointRefFactory.EndpointType endpointType) {
        return this.doRpcWithRetry(function, resourceId, endpointType, this.retryTimes);
    }

    private <T> T doRpcWithRetry(Callable<T> function, String resourceId, RpcEndpointRefFactory.EndpointType endpointType, int retryTimes) {
        return (T)RetryCommand.run(() -> {
            try {
                return function.call();
            }
            catch (Throwable t) {
                throw this.handleRpcException(resourceId, endpointType, t);
            }
        }, (int)retryTimes, (long)this.retryIntervalMs);
    }

    private void doRpcWithRetry(Runnable function, String resourceId, RpcEndpointRefFactory.EndpointType endpointType) {
        RetryCommand.run(() -> {
            try {
                function.run();
            }
            catch (Throwable t) {
                throw this.handleRpcException(resourceId, endpointType, t);
            }
            return null;
        }, (int)this.retryTimes, (long)this.retryIntervalMs);
    }

    private Exception handleRpcException(String resourceId, RpcEndpointRefFactory.EndpointType endpointType, Throwable t) {
        try {
            this.invalidateEndpointCache(resourceId, endpointType);
        }
        catch (Throwable e) {
            LOGGER.warn("invalidate rpc cache {} failed: {}", (Object)resourceId, (Object)e);
        }
        return new GeaflowRuntimeException(String.format("do rpc failed. %s", t.getMessage()), t);
    }

    protected void invalidateEndpointCache(String resourceId, RpcEndpointRefFactory.EndpointType endpointType) {
        ResourceData resourceData = haService.invalidateResource(resourceId);
        if (resourceData != null) {
            refFactory.invalidateEndpointCache(resourceData.getHost(), resourceData.getRpcPort(), endpointType);
        }
    }

    protected ResourceData getResourceData(String resourceId) {
        if (haService == null) {
            LOGGER.warn("HAService is not initialized, cannot resolve resource: {}", (Object)resourceId);
            return null;
        }
        ResourceData resourceData = haService.resolveResource(resourceId);
        if (resourceData == null) {
            LOGGER.warn("Resource data not found for resource: {}", (Object)resourceId);
        }
        return resourceData;
    }

    public ExecutorService getExecutor() {
        return this.executorService;
    }
}

