package org.apache.iotdb.db.sync;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.exception.sync.PipeNotExistException;
import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipe.PipeStatus;
import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.commons.sync.transport.SyncIdentityInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.sync.common.ClusterSyncInfoFetcher;
import org.apache.iotdb.db.sync.common.ISyncInfoFetcher;
import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginManager;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
import org.apache.iotdb.db.sync.sender.pipe.ExternalPipeSink;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.db.sync.transport.client.SenderManager;
import org.apache.iotdb.db.sync.transport.server.ReceiverManager;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/sync/SyncService.class */
public class SyncService implements IService {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) SyncService.class);
    private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
    private final Map<String, Pipe> pipes;
    private final Map<String, ExtPipePluginManager> extPipePluginManagers;
    private final ISyncInfoFetcher syncInfoFetcher;
    private final ReceiverManager receiverManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/sync/SyncService$SyncServiceHolder.class */
    public static class SyncServiceHolder {
        private static final SyncService INSTANCE = new SyncService();

        private SyncServiceHolder() {
        }
    }

    private SyncService() {
        this.receiverManager = new ReceiverManager();
        this.pipes = new ConcurrentHashMap();
        this.extPipePluginManagers = new ConcurrentHashMap();
        if (config.isClusterMode()) {
            this.syncInfoFetcher = ClusterSyncInfoFetcher.getInstance();
        } else {
            this.syncInfoFetcher = LocalSyncInfoFetcher.getInstance();
        }
    }

    public static SyncService getInstance() {
        return SyncServiceHolder.INSTANCE;
    }

    public TSStatus handshake(TSyncIdentityInfo tSyncIdentityInfo, String str, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher) {
        return this.receiverManager.handshake(tSyncIdentityInfo, str, iPartitionFetcher, iSchemaFetcher);
    }

    public TSStatus transportFile(TSyncTransportMetaInfo tSyncTransportMetaInfo, ByteBuffer byteBuffer) throws TException {
        return this.receiverManager.transportFile(tSyncTransportMetaInfo, byteBuffer);
    }

    public TSStatus transportPipeData(ByteBuffer byteBuffer) throws TException {
        return this.receiverManager.transportPipeData(byteBuffer);
    }

    public void handleClientExit() {
        this.receiverManager.handleClientExit();
    }

    public PipeSink getPipeSink(String str) throws PipeSinkException {
        return this.syncInfoFetcher.getPipeSink(str);
    }

    public void addPipeSink(CreatePipeSinkStatement createPipeSinkStatement) throws PipeSinkException {
        logger.info("Add PIPESINK {}", createPipeSinkStatement);
        TSStatus addPipeSink = this.syncInfoFetcher.addPipeSink(createPipeSinkStatement);
        if (addPipeSink.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeSinkException(addPipeSink.message);
        }
    }

    public void dropPipeSink(String str) throws PipeSinkException {
        logger.info("Execute DROP PIPESINK {}", str);
        TSStatus dropPipeSink = this.syncInfoFetcher.dropPipeSink(str);
        if (dropPipeSink.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeSinkException(dropPipeSink.message);
        }
    }

    public List<PipeSink> getAllPipeSink() {
        return this.syncInfoFetcher.getAllPipeSinks();
    }

    public synchronized void addPipe(PipeInfo pipeInfo) throws PipeException {
        logger.info("Execute CREATE PIPE {}", pipeInfo);
        long currentTime = DateTimeUtils.currentTime();
        if ((pipeInfo instanceof TsFilePipeInfo) && ((TsFilePipeInfo) pipeInfo).getDataStartTimestamp() > currentTime) {
            throw new PipeException(String.format("Start time %s is later than current time %s, this is not supported yet.", DateTimeUtils.convertLongToDate(((TsFilePipeInfo) pipeInfo).getDataStartTimestamp()), DateTimeUtils.convertLongToDate(currentTime)));
        }
        TSStatus addPipe = this.syncInfoFetcher.addPipe(pipeInfo);
        if (addPipe.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(addPipe.message);
        }
        try {
            Pipe parsePipeInfoAsPipe = SyncPipeUtil.parsePipeInfoAsPipe(pipeInfo, getPipeSink(pipeInfo.getPipeSinkName()));
            this.pipes.put(pipeInfo.getPipeName(), parsePipeInfoAsPipe);
            if (parsePipeInfoAsPipe.getPipeSink().getType() == PipeSink.PipeSinkType.ExternalPipe) {
                startExternalPipeManager(pipeInfo.getPipeName(), false);
            }
        } catch (PipeSinkException e) {
            logger.error("failed to add PIPE because {}", e.getMessage(), e);
            throw new PipeException(String.format("failed to add PIPE because %s", e.getMessage()));
        }
    }

    public synchronized void stopPipe(String str) throws PipeException {
        logger.info("Execute stop PIPE {}", str);
        Pipe pipe = getPipe(str);
        if (pipe.getStatus() == PipeStatus.RUNNING) {
            if (pipe.getPipeSink().getType() != PipeSink.PipeSinkType.IoTDB && this.extPipePluginManagers.containsKey(str)) {
                try {
                    this.extPipePluginManagers.get(str).stopExtPipe(((ExternalPipeSink) pipe.getPipeSink()).getExtPipeSinkTypeName());
                } catch (Exception e) {
                    throw new PipeException("Failed to stop externalPipeProcessor. " + e.getMessage());
                }
            }
            pipe.stop();
        }
        TSStatus stopPipe = this.syncInfoFetcher.stopPipe(str);
        if (stopPipe.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(stopPipe.message);
        }
    }

    public synchronized void stopPipe(String str, long j) {
        try {
            if (getPipe(str).getCreateTime() == j) {
                stopPipe(str);
            } else {
                logger.warn("Skip execute stop PIPE {} with createTime {} because of createTime mismatch.", str, Long.valueOf(j));
            }
        } catch (PipeException e) {
            logger.warn("Skip execute stop PIPE {} with createTime {} because {}", str, Long.valueOf(j), e.getMessage());
        }
    }

    public synchronized void startPipe(String str) throws PipeException {
        logger.info("Execute start PIPE {}", str);
        Pipe pipe = getPipe(str);
        if (pipe.getStatus() == PipeStatus.STOP) {
            if (pipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
                pipe.start();
            } else {
                pipe.start();
                startExternalPipeManager(str, true);
            }
        }
        TSStatus startPipe = this.syncInfoFetcher.startPipe(str);
        if (startPipe.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(startPipe.message);
        }
    }

    public synchronized void startPipe(String str, long j) {
        try {
            if (getPipe(str).getCreateTime() == j) {
                startPipe(str);
            } else {
                logger.warn("Skip execute start PIPE {} with createTime {} because of createTime mismatch.", str, Long.valueOf(j));
            }
        } catch (PipeException e) {
            logger.warn("Skip execute start PIPE {} with createTime {} because {}", str, Long.valueOf(j), e.getMessage());
        }
    }

    public synchronized void dropPipe(String str) throws PipeException {
        logger.info("Execute drop PIPE {}", str);
        try {
            Pipe pipe = getPipe(str);
            if (pipe.getPipeSink().getType() != PipeSink.PipeSinkType.IoTDB && this.extPipePluginManagers.containsKey(str)) {
                this.extPipePluginManagers.get(str).dropExtPipe(((ExternalPipeSink) pipe.getPipeSink()).getExtPipeSinkTypeName());
                this.extPipePluginManagers.remove(str);
            }
            pipe.drop();
            TSStatus dropPipe = this.syncInfoFetcher.dropPipe(str);
            this.pipes.remove(str);
            if (dropPipe.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                throw new PipeException(dropPipe.message);
            }
        } catch (PipeNotExistException e) {
        }
    }

    public synchronized void dropPipe(String str, long j) {
        try {
            if (getPipe(str).getCreateTime() == j) {
                dropPipe(str);
            } else {
                logger.warn("Skip execute drop PIPE {} with createTime {} because of createTime mismatch.", str, Long.valueOf(j));
            }
        } catch (PipeException e) {
            logger.warn("Skip execute drop PIPE {} with createTime {} because {}", str, Long.valueOf(j), e.getMessage());
        }
    }

    public List<PipeInfo> getAllPipeInfos() {
        return this.syncInfoFetcher.getAllPipeInfos();
    }

    private Pipe getPipe(String str) throws PipeNotExistException {
        if (this.pipes.containsKey(str)) {
            return this.pipes.get(str);
        }
        throw new PipeNotExistException(str);
    }

    public void recordMessage(String str, PipeMessage pipeMessage) {
        if (!this.pipes.containsKey(str)) {
            logger.warn("No running PIPE for message {}.", pipeMessage);
            return;
        }
        TSStatus tSStatus = null;
        switch (pipeMessage.getType()) {
            case ERROR:
                logger.error("Error occurred when executing PIPE [{}] because {}.", str, pipeMessage.getMessage());
                tSStatus = this.syncInfoFetcher.recordMsg(str, pipeMessage);
                break;
            case WARN:
                logger.error("Warn occurred when executing PIPE [{}] because {}.", str, pipeMessage.getMessage());
                tSStatus = this.syncInfoFetcher.recordMsg(str, pipeMessage);
                break;
            default:
                logger.error("Unknown message type: {}", pipeMessage);
                break;
        }
        if (tSStatus == null || tSStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            return;
        }
        logger.error("Failed to record message: {}", pipeMessage);
    }

    public List<TShowPipeInfo> showPipe(String str) {
        boolean isEmpty = StringUtils.isEmpty(str);
        ArrayList arrayList = new ArrayList();
        for (PipeInfo pipeInfo : getInstance().getAllPipeInfos()) {
            if (isEmpty || str.equals(pipeInfo.getPipeName())) {
                arrayList.add(pipeInfo.getTShowPipeInfo());
            }
        }
        arrayList.addAll(showPipeForReceiver(str));
        return arrayList;
    }

    public List<TShowPipeInfo> showPipeForReceiver(String str) {
        boolean isEmpty = StringUtils.isEmpty(str);
        ArrayList arrayList = new ArrayList();
        for (SyncIdentityInfo syncIdentityInfo : this.receiverManager.getAllTSyncIdentityInfos()) {
            if (isEmpty || str.equals(syncIdentityInfo.getPipeName())) {
                arrayList.add(new TShowPipeInfo(syncIdentityInfo.getCreateTime(), syncIdentityInfo.getPipeName(), "receiver", syncIdentityInfo.getRemoteAddress(), PipeStatus.RUNNING.name(), String.format("Database='%s'", syncIdentityInfo.getDatabase()), PipeMessage.PipeMessageType.NORMAL.name()));
            }
        }
        return arrayList;
    }

    private void startExternalPipeManager(String str, boolean z) throws PipeException {
        if (!(this.pipes.get(str) instanceof TsFilePipe)) {
            logger.error("startExternalPipeManager(), runningPipe is not TsFilePipe. {}", str);
            return;
        }
        PipeSink pipeSink = this.pipes.get(str).getPipeSink();
        if (!(pipeSink instanceof ExternalPipeSink)) {
            logger.error("startExternalPipeManager(), pipeSink is not ExternalPipeSink. {}", pipeSink);
            return;
        }
        String extPipeSinkTypeName = ((ExternalPipeSink) pipeSink).getExtPipeSinkTypeName();
        if (ExtPipePluginRegister.getInstance().getWriteFactory(extPipeSinkTypeName) == null) {
            logger.error("startExternalPipeManager(), can not found ExternalPipe plugin for {}.", extPipeSinkTypeName);
            throw new PipeException("Can not found ExternalPipe plugin for " + extPipeSinkTypeName + TsFileConstant.PATH_SEPARATOR);
        }
        ExtPipePluginManager extPipePluginManager = new ExtPipePluginManager((TsFilePipe) this.pipes.get(str));
        if (!this.extPipePluginManagers.containsKey(str)) {
            this.extPipePluginManagers.put(str, extPipePluginManager);
        }
        if (z) {
            try {
                extPipePluginManager.startExtPipe(extPipeSinkTypeName, ((ExternalPipeSink) pipeSink).getSinkParams());
            } catch (IOException e) {
                logger.error("Failed to start External Pipe: {}.", extPipeSinkTypeName, e);
                throw new PipeException("Failed to start External Pipe: " + extPipeSinkTypeName + ". " + e.getMessage());
            }
        }
    }

    public ExtPipePluginManager getExternalPipeManager(String str) {
        return this.extPipePluginManagers.get(str);
    }

    @Override // org.apache.iotdb.commons.service.IService
    public void start() throws StartupException {
        ExtPipePluginRegister extPipePluginRegister = ExtPipePluginRegister.getInstance();
        if (extPipePluginRegister == null) {
            throw new StartupException("Load ExternalPipe Plugin error.");
        }
        logger.info("Load {} ExternalPipe Plugin: {}", Integer.valueOf(extPipePluginRegister.getAllPluginName().size()), extPipePluginRegister.getAllPluginName());
        try {
            recover();
        } catch (Exception e) {
            logger.error("Recovery error.", (Throwable) e);
            throw new StartupException(e);
        }
    }

    @Override // org.apache.iotdb.commons.service.IService
    public void stop() {
        for (Pipe pipe : this.pipes.values()) {
            try {
                pipe.close();
            } catch (PipeException e) {
                logger.warn(String.format("Stop PIPE %s error when stop Sender Service.", pipe.getName()), (Throwable) e);
            }
        }
    }

    @Override // org.apache.iotdb.commons.service.IService
    public void shutdown(long j) throws ShutdownException {
        for (Pipe pipe : this.pipes.values()) {
            try {
                pipe.close();
            } catch (PipeException e) {
                logger.warn(String.format("Stop PIPE %s error when shutdown Sender Service.", pipe.getName()), (Throwable) e);
                throw new ShutdownException(e);
            }
        }
    }

    @Override // org.apache.iotdb.commons.service.IService
    public ServiceType getID() {
        return ServiceType.SYNC_SERVICE;
    }

    private void recover() throws IOException, PipeException, PipeSinkException {
        for (PipeInfo pipeInfo : this.syncInfoFetcher.getAllPipeInfos()) {
            logger.info("Recover PIPE [{}] whose status is {}", pipeInfo.getPipeName(), pipeInfo.getStatus().name());
            if (PipeStatus.PARTIAL_CREATE.equals(pipeInfo.getStatus()) || PipeStatus.DROP.equals(pipeInfo.getStatus())) {
                logger.info("Skip PIPE [{}] because its status is {}", pipeInfo.getPipeName(), pipeInfo.getStatus().name());
            } else {
                Pipe parsePipeInfoAsPipe = SyncPipeUtil.parsePipeInfoAsPipe(pipeInfo, this.syncInfoFetcher.getPipeSink(pipeInfo.getPipeSinkName()));
                this.pipes.put(pipeInfo.getPipeName(), parsePipeInfoAsPipe);
                switch (pipeInfo.getStatus()) {
                    case RUNNING:
                        parsePipeInfoAsPipe.start();
                        break;
                    case STOP:
                    case PARTIAL_START:
                    case PARTIAL_STOP:
                        parsePipeInfoAsPipe.stop();
                        break;
                    case PARTIAL_CREATE:
                    case DROP:
                        throw new PipeException("Unexpected status " + pipeInfo.getStatus().name());
                    default:
                        throw new IOException(String.format("Can not recognize running pipe status %s.", parsePipeInfoAsPipe.getStatus()));
                }
                if (parsePipeInfoAsPipe.getPipeSink().getType() == PipeSink.PipeSinkType.ExternalPipe) {
                    startExternalPipeManager(pipeInfo.getPipeName(), parsePipeInfoAsPipe.getStatus() == PipeStatus.RUNNING);
                }
            }
        }
    }

    public List<ISyncManager> getOrCreateSyncManager(String str) {
        ArrayList arrayList = new ArrayList();
        for (Pipe pipe : this.pipes.values()) {
            if (pipe.isHistoryCollectFinished()) {
                arrayList.add(pipe.getOrCreateSyncManager(str));
            }
        }
        return arrayList;
    }

    public synchronized void unregisterDataRegion(String str) {
        Iterator<Pipe> it = this.pipes.values().iterator();
        while (it.hasNext()) {
            it.next().unregisterDataRegion(str);
        }
    }

    public SenderManager getSenderManager() {
        return null;
    }
}
