package org.apache.tajo.querymaster;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.ResourceProtos;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.master.event.QueryEvent;
import org.apache.tajo.master.event.QueryEventType;
import org.apache.tajo.master.event.QueryStartEvent;
import org.apache.tajo.master.event.StageShuffleReportEvent;
import org.apache.tajo.master.event.TaskAttemptEvent;
import org.apache.tajo.master.event.TaskAttemptEventType;
import org.apache.tajo.master.event.TaskAttemptStatusUpdateEvent;
import org.apache.tajo.master.event.TaskCompletionEvent;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.resource.NodeResource;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.session.Session;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TajoWorker;
import org.apache.tajo.worker.event.QMResourceAllocateEvent;

/* loaded from: input_file:org/apache/tajo/querymaster/QueryMasterManagerService.class */
public class QueryMasterManagerService extends CompositeService implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
    private static final Log LOG = LogFactory.getLog(QueryMasterManagerService.class.getName());
    private AsyncRpcServer rpcServer;
    private InetSocketAddress bindAddr;
    private String addr;
    private int port;
    private QueryMaster queryMaster;
    private TajoWorker.WorkerContext workerContext;

    public QueryMasterManagerService(TajoWorker.WorkerContext workerContext, int i) {
        super(QueryMasterManagerService.class.getName());
        this.workerContext = workerContext;
        this.port = i;
    }

    public QueryMaster getQueryMaster() {
        return this.queryMaster;
    }

    public void serviceInit(Configuration configuration) throws Exception {
        TajoConf tajoConf = (TajoConf) TUtil.checkTypeAndGet(configuration, TajoConf.class);
        InetSocketAddress inetSocketAddress = new InetSocketAddress("0.0.0.0", this.port);
        if (inetSocketAddress.getAddress() == null) {
            throw new IllegalArgumentException("Failed resolve of " + inetSocketAddress);
        }
        this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, inetSocketAddress, tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM));
        this.rpcServer.start();
        this.bindAddr = NetUtils.getConnectAddress(this.rpcServer.getListenAddress());
        this.addr = this.bindAddr.getHostName() + ":" + this.bindAddr.getPort();
        this.port = this.bindAddr.getPort();
        this.queryMaster = new QueryMaster(this.workerContext);
        addService(this.queryMaster);
        LOG.info("QueryMasterManagerService is bind to " + this.addr);
        tajoConf.setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, this.addr);
        super.serviceInit(configuration);
    }

    public void serviceStop() throws Exception {
        if (this.rpcServer != null) {
            this.rpcServer.shutdown();
        }
        LOG.info("QueryMasterManagerService stopped");
        super.serviceStop();
    }

    public InetSocketAddress getBindAddr() {
        return this.bindAddr;
    }

    @Override // org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface
    public void statusUpdate(RpcController rpcController, ResourceProtos.TaskStatusProto taskStatusProto, RpcCallback<PrimitiveProtos.NullProto> rpcCallback) {
        QueryId queryId = new QueryId(taskStatusProto.getId().getTaskId().getExecutionBlockId().getQueryId());
        TaskAttemptId taskAttemptId = new TaskAttemptId(taskStatusProto.getId());
        QueryMasterTask queryMasterTask = this.queryMaster.getQueryMasterTask(queryId);
        if (queryMasterTask != null) {
            Task task = queryMasterTask.getQuery().getStage(taskAttemptId.getTaskId().getExecutionBlockId()).getTask(taskAttemptId.getTaskId());
            TaskAttempt attempt = task.getAttempt(taskAttemptId.getId());
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
            }
            if (taskStatusProto.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
                LOG.warn(taskAttemptId + " Killed");
                attempt.handle(new TaskAttemptEvent(new TaskAttemptId(taskStatusProto.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
            } else {
                queryMasterTask.getEventHandler().handle(new TaskAttemptStatusUpdateEvent(new TaskAttemptId(taskStatusProto.getId()), taskStatusProto));
            }
        }
        rpcCallback.run(TajoWorker.NULL_PROTO);
    }

    @Override // org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface
    public void ping(RpcController rpcController, TajoIdProtos.ExecutionBlockIdProto executionBlockIdProto, RpcCallback<PrimitiveProtos.NullProto> rpcCallback) {
        rpcCallback.run(TajoWorker.NULL_PROTO);
    }

    @Override // org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface
    public void fatalError(RpcController rpcController, ResourceProtos.TaskFatalErrorReport taskFatalErrorReport, RpcCallback<PrimitiveProtos.NullProto> rpcCallback) {
        QueryMasterTask queryMasterTask = this.queryMaster.getQueryMasterTask(new QueryId(taskFatalErrorReport.getId().getTaskId().getExecutionBlockId().getQueryId()));
        if (queryMasterTask != null) {
            queryMasterTask.handleTaskFailed(taskFatalErrorReport);
        } else {
            LOG.warn("No QueryMasterTask: " + new TaskAttemptId(taskFatalErrorReport.getId()));
        }
        rpcCallback.run(TajoWorker.NULL_PROTO);
    }

    @Override // org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface
    public void done(RpcController rpcController, ResourceProtos.TaskCompletionReport taskCompletionReport, RpcCallback<PrimitiveProtos.NullProto> rpcCallback) {
        QueryMasterTask queryMasterTask = this.queryMaster.getQueryMasterTask(new QueryId(taskCompletionReport.getId().getTaskId().getExecutionBlockId().getQueryId()));
        if (queryMasterTask != null) {
            queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(taskCompletionReport));
        }
        rpcCallback.run(TajoWorker.NULL_PROTO);
    }

    @Override // org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface
    public void doneExecutionBlock(RpcController rpcController, ResourceProtos.ExecutionBlockReport executionBlockReport, RpcCallback<PrimitiveProtos.NullProto> rpcCallback) {
        QueryMasterTask queryMasterTask = this.queryMaster.getQueryMasterTask(new QueryId(executionBlockReport.getEbId().getQueryId()));
        if (queryMasterTask != null) {
            queryMasterTask.getEventHandler().handle(new StageShuffleReportEvent(new ExecutionBlockId(executionBlockReport.getEbId()), executionBlockReport));
        }
        rpcCallback.run(TajoWorker.NULL_PROTO);
    }

    @Override // org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface
    public void getExecutionBlockContext(RpcController rpcController, ResourceProtos.ExecutionBlockContextRequest executionBlockContextRequest, RpcCallback<ResourceProtos.ExecutionBlockContextResponse> rpcCallback) {
        QueryMasterTask queryMasterTask = this.queryMaster.getQueryMasterTask(new QueryId(executionBlockContextRequest.getExecutionBlockId().getQueryId()));
        if (queryMasterTask == null) {
            rpcController.setFailed("Can't find query. request: " + executionBlockContextRequest);
            return;
        }
        Stage stage = queryMasterTask.getQuery().getStage(new ExecutionBlockId(executionBlockContextRequest.getExecutionBlockId()));
        PlanProto.ShuffleType shuffleType = stage.getDataChannel().getShuffleType();
        ResourceProtos.ExecutionBlockContextResponse.Builder newBuilder = ResourceProtos.ExecutionBlockContextResponse.newBuilder();
        newBuilder.setExecutionBlockId(executionBlockContextRequest.getExecutionBlockId()).setQueryContext(stage.getContext().getQueryContext().getProto()).setQueryOutputPath(stage.getContext().getStagingDir().toString()).setPlanJson(CoreGsonHelper.toJson(stage.getBlock().getPlan(), LogicalNode.class)).setShuffleType(shuffleType);
        if (!stage.getAssignedWorkerMap().containsKey(Integer.valueOf(executionBlockContextRequest.getWorker().getId()))) {
            stage.getAssignedWorkerMap().put(Integer.valueOf(executionBlockContextRequest.getWorker().getId()), NetUtils.createSocketAddr(executionBlockContextRequest.getWorker().getHost(), executionBlockContextRequest.getWorker().getPeerRpcPort()));
        }
        rpcCallback.run(newBuilder.m369build());
    }

    @Override // org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface
    public void killQuery(RpcController rpcController, TajoIdProtos.QueryIdProto queryIdProto, RpcCallback<PrimitiveProtos.NullProto> rpcCallback) {
        QueryId queryId = new QueryId(queryIdProto);
        QueryMasterTask queryMasterTask = this.queryMaster.getQueryMasterTask(queryId);
        if (queryMasterTask != null) {
            queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
        }
        rpcCallback.run(TajoWorker.NULL_PROTO);
    }

    @Override // org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface
    public void executeQuery(RpcController rpcController, ResourceProtos.QueryExecutionRequest queryExecutionRequest, RpcCallback<PrimitiveProtos.NullProto> rpcCallback) {
        QueryId queryId = new QueryId(queryExecutionRequest.getQueryId());
        LOG.info("Receive executeQuery request:" + queryId);
        this.queryMaster.handle(new QueryStartEvent(queryId, new Session(queryExecutionRequest.getSession()), new QueryContext(this.workerContext.getQueryMaster().getContext().getConf(), queryExecutionRequest.getQueryContext()), queryExecutionRequest.getExprInJson().getValue(), queryExecutionRequest.getLogicalPlanJson().getValue(), new NodeResource(queryExecutionRequest.getAllocation().getResource())));
        rpcCallback.run(TajoWorker.NULL_PROTO);
    }

    @Override // org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface
    public void allocateQueryMaster(RpcController rpcController, ResourceProtos.AllocationResourceProto allocationResourceProto, RpcCallback<PrimitiveProtos.BoolProto> rpcCallback) {
        CallFuture callFuture = new CallFuture();
        this.workerContext.getNodeResourceManager().handle(new QMResourceAllocateEvent(allocationResourceProto, callFuture));
        try {
            rpcCallback.run(callFuture.get());
        } catch (Exception e) {
            rpcController.setFailed(e.getMessage());
            rpcCallback.run(TajoWorker.FALSE_PROTO);
        }
    }
}
