package org.apache.iotdb.confignode.manager;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeStatus;
import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
import org.apache.iotdb.confignode.consensus.response.PipeResp;
import org.apache.iotdb.confignode.consensus.response.PipeSinkResp;
import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/manager/SyncManager.class */
public class SyncManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(SyncManager.class);
    private final IManager configManager;
    private final ClusterSyncInfo clusterSyncInfo;

    public SyncManager(IManager iManager, ClusterSyncInfo clusterSyncInfo) {
        this.configManager = iManager;
        this.clusterSyncInfo = clusterSyncInfo;
    }

    public TSStatus createPipeSink(CreatePipeSinkPlan createPipeSinkPlan) {
        try {
            this.clusterSyncInfo.checkAddPipeSink(createPipeSinkPlan.getPipeSinkInfo().getPipeSinkName());
            return getConsensusManager().write(createPipeSinkPlan).getStatus();
        } catch (PipeSinkException e) {
            LOGGER.error(e.getMessage());
            return new TSStatus(TSStatusCode.PIPESINK_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
    }

    public TSStatus dropPipeSink(DropPipeSinkPlan dropPipeSinkPlan) {
        try {
            this.clusterSyncInfo.checkDropPipeSink(dropPipeSinkPlan.getPipeSinkName());
            return getConsensusManager().write(dropPipeSinkPlan).getStatus();
        } catch (PipeSinkException e) {
            LOGGER.error(e.getMessage());
            return new TSStatus(TSStatusCode.PIPESINK_ERROR.getStatusCode()).setMessage(e.getMessage());
        }
    }

    public TGetPipeSinkResp getPipeSink(String str) {
        PipeSinkResp pipeSinkResp = (PipeSinkResp) getConsensusManager().read(new GetPipeSinkPlan(str)).getDataset();
        TGetPipeSinkResp tGetPipeSinkResp = new TGetPipeSinkResp();
        tGetPipeSinkResp.setStatus(pipeSinkResp.getStatus());
        if (pipeSinkResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            tGetPipeSinkResp.setPipeSinkInfoList((List) pipeSinkResp.getPipeSinkList().stream().map((v0) -> {
                return v0.getTPipeSinkInfo();
            }).collect(Collectors.toList()));
        }
        return tGetPipeSinkResp;
    }

    public void checkAddPipe(PipeInfo pipeInfo) throws PipeException {
        this.clusterSyncInfo.checkAddPipe(pipeInfo);
    }

    public TSStatus preCreatePipe(PipeInfo pipeInfo) {
        pipeInfo.setStatus(PipeStatus.PREPARE_CREATE);
        return getConsensusManager().write(new PreCreatePipePlan(pipeInfo)).getStatus();
    }

    public TSStatus setPipeStatus(String str, PipeStatus pipeStatus) {
        return getConsensusManager().write(new SetPipeStatusPlan(str, pipeStatus)).getStatus();
    }

    public TShowPipeResp showPipe(String str) {
        PipeResp pipeResp = (PipeResp) getConsensusManager().read(new ShowPipePlan(str)).getDataset();
        TShowPipeResp tShowPipeResp = new TShowPipeResp();
        tShowPipeResp.setStatus(pipeResp.getStatus());
        if (pipeResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            tShowPipeResp.setPipeInfoList((List) pipeResp.getPipeInfoList().stream().map((v0) -> {
                return v0.getTShowPipeInfo();
            }).collect(Collectors.toList()));
        }
        return tShowPipeResp;
    }

    public PipeInfo getPipeInfo(String str) throws PipeException {
        return this.clusterSyncInfo.getPipeInfo(str);
    }

    public List<TSStatus> operatePipeOnDataNodes(String str, SyncOperation syncOperation) {
        Map<Integer, TDataNodeLocation> registeredDataNodeLocations = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        List<TSStatus> synchronizedList = Collections.synchronizedList(new ArrayList(registeredDataNodeLocations.size()));
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(new AsyncClientHandler<>(DataNodeRequestType.OPERATE_PIPE, new TOperatePipeOnDataNodeReq(str, (byte) syncOperation.ordinal()), registeredDataNodeLocations));
        return synchronizedList;
    }

    public List<TSStatus> preCreatePipeOnDataNodes(PipeInfo pipeInfo) {
        Map<Integer, TDataNodeLocation> registeredDataNodeLocations = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        List<TSStatus> synchronizedList = Collections.synchronizedList(new ArrayList(registeredDataNodeLocations.size()));
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(new AsyncClientHandler<>(DataNodeRequestType.PRE_CREATE_PIPE, new TCreatePipeOnDataNodeReq(pipeInfo.serializeToByteBuffer()), registeredDataNodeLocations));
        return synchronizedList;
    }

    private ConsensusManager getConsensusManager() {
        return this.configManager.getConsensusManager();
    }
}
