/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.driver;

import com.baidu.brpc.server.RpcServerOptions;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.geaflow.cluster.client.callback.JobOperatorCallback;
import org.apache.geaflow.cluster.client.callback.JobOperatorCallbackFactory;
import org.apache.geaflow.cluster.common.AbstractContainer;
import org.apache.geaflow.cluster.common.ExecutionIdGenerator;
import org.apache.geaflow.cluster.constants.ClusterConstants;
import org.apache.geaflow.cluster.driver.DriverContext;
import org.apache.geaflow.cluster.driver.DriverEventDispatcher;
import org.apache.geaflow.cluster.driver.DriverInfo;
import org.apache.geaflow.cluster.driver.IDriver;
import org.apache.geaflow.cluster.exception.ComponentUncaughtExceptionHandler;
import org.apache.geaflow.cluster.executor.IPipelineExecutor;
import org.apache.geaflow.cluster.executor.PipelineExecutorContext;
import org.apache.geaflow.cluster.executor.PipelineExecutorFactory;
import org.apache.geaflow.cluster.protocol.IEvent;
import org.apache.geaflow.cluster.rpc.impl.DriverEndpoint;
import org.apache.geaflow.cluster.rpc.impl.PipelineMasterEndpoint;
import org.apache.geaflow.cluster.rpc.impl.RpcServiceImpl;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.exception.GeaflowRuntimeException;
import org.apache.geaflow.common.rpc.ConfigurableServerOption;
import org.apache.geaflow.common.utils.PortUtil;
import org.apache.geaflow.common.utils.ThreadUtil;
import org.apache.geaflow.pipeline.Pipeline;
import org.apache.geaflow.pipeline.callback.TaskCallBack;
import org.apache.geaflow.pipeline.service.PipelineService;
import org.apache.geaflow.pipeline.task.PipelineTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Driver
extends AbstractContainer
implements IDriver<IEvent, Boolean> {
    private static final Logger LOGGER = LoggerFactory.getLogger(Driver.class);
    private static final String DRIVER_EXECUTOR = "driver-executor";
    private static final AtomicInteger pipelineTaskIdGenerator = new AtomicInteger(0);
    private DriverEventDispatcher eventDispatcher;
    private DriverContext driverContext;
    private ExecutorService executorService;
    private Map<PipelineService, IPipelineExecutor> pipelineExecutorMap;
    private JobOperatorCallback jobOperatorCallback;

    public Driver() {
        this(0);
    }

    public Driver(int rpcPort) {
        super(rpcPort);
    }

    @Override
    public void init(DriverContext driverContext) {
        super.init(driverContext.getId(), ClusterConstants.getDriverName(driverContext.getId()), driverContext.getConfig());
        this.driverContext = driverContext;
        this.eventDispatcher = new DriverEventDispatcher();
        this.executorService = Executors.newFixedThreadPool(1, ThreadUtil.namedThreadFactory((boolean)true, (String)DRIVER_EXECUTOR, (Thread.UncaughtExceptionHandler)ComponentUncaughtExceptionHandler.INSTANCE));
        this.pipelineExecutorMap = new HashMap<PipelineService, IPipelineExecutor>();
        this.jobOperatorCallback = JobOperatorCallbackFactory.createJobOperatorCallback(this.configuration);
        ExecutionIdGenerator.init(this.id);
        if (driverContext.getPipeline() != null) {
            LOGGER.info("driver {} execute pipeline from recovered context", (Object)this.name);
            this.executorService.execute(() -> this.executePipelineInternal(driverContext.getPipeline()));
        }
        this.registerToMaster();
        this.registerHAService();
        LOGGER.info("driver {} init finish", (Object)this.name);
    }

    @Override
    protected void startRpcService() {
        RpcServerOptions serverOptions = ConfigurableServerOption.build((Configuration)this.configuration);
        this.rpcService = new RpcServiceImpl(PortUtil.getPort((int)this.rpcPort), serverOptions);
        this.rpcService.addEndpoint(new DriverEndpoint(this));
        this.rpcService.addEndpoint(new PipelineMasterEndpoint(this));
        this.rpcPort = this.rpcService.startService();
    }

    @Override
    public Boolean executePipeline(Pipeline pipeline) {
        LOGGER.info("driver {} execute pipeline {}", (Object)this.name, (Object)pipeline);
        Future<Boolean> future = this.executorService.submit(() -> this.executePipelineInternal(pipeline));
        try {
            return future.get();
        }
        catch (Throwable e) {
            LOGGER.error(e.getMessage(), e);
            throw new GeaflowRuntimeException(e);
        }
    }

    public Boolean executePipelineInternal(Pipeline pipeline) {
        try {
            LOGGER.info("start execute pipeline {}", (Object)pipeline);
            this.driverContext.addPipeline(pipeline);
            this.driverContext.checkpoint(new DriverContext.PipelineCheckpointFunction());
            IPipelineExecutor pipelineExecutor = PipelineExecutorFactory.createPipelineExecutor();
            PipelineExecutorContext executorContext = new PipelineExecutorContext(this.name, this.driverContext.getIndex(), this.eventDispatcher, this.configuration, pipelineTaskIdGenerator);
            pipelineExecutor.init(executorContext);
            pipelineExecutor.register(pipeline.getViewDescMap());
            List pipelineTaskList = pipeline.getPipelineTaskList();
            List taskCallBackList = pipeline.getPipelineTaskCallbacks();
            int size = pipelineTaskList.size();
            for (int i = 0; i < size; ++i) {
                if (this.driverContext.getFinishedPipelineTasks() != null && this.driverContext.getFinishedPipelineTasks().contains(i)) continue;
                pipelineExecutor.runPipelineTask((PipelineTask)pipelineTaskList.get(i), (TaskCallBack)taskCallBackList.get(i));
                this.driverContext.addFinishedPipelineTask(i);
                this.driverContext.checkpoint(new DriverContext.PipelineTaskCheckpointFunction());
            }
            List pipelineServices = pipeline.getPipelineServices();
            for (PipelineService pipelineService : pipelineServices) {
                LOGGER.info("execute service");
                this.pipelineExecutorMap.put(pipelineService, pipelineExecutor);
                pipelineExecutor.startPipelineService(pipelineService);
            }
            this.jobOperatorCallback.onFinish();
            LOGGER.info("finish execute pipeline {}", (Object)pipeline);
            return true;
        }
        catch (Throwable e) {
            LOGGER.error("driver exception", e);
            throw e;
        }
    }

    @Override
    public Boolean process(IEvent input) {
        LOGGER.info("{} process event {}", (Object)this.name, (Object)input);
        this.eventDispatcher.dispatch(input);
        return true;
    }

    @Override
    public void close() {
        this.executorService.shutdownNow();
        for (PipelineService service : this.pipelineExecutorMap.keySet()) {
            this.pipelineExecutorMap.get(service).stopPipelineService(service);
        }
        this.pipelineExecutorMap.clear();
        super.close();
        LOGGER.info("driver {} closed", (Object)this.name);
    }

    @Override
    protected DriverInfo buildComponentInfo() {
        DriverInfo driverInfo = new DriverInfo();
        this.fillComponentInfo(driverInfo);
        return driverInfo;
    }
}

