/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.sync.common;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.sync.SyncPathUtil;
import org.apache.iotdb.db.exception.sync.PipeException;
import org.apache.iotdb.db.exception.sync.PipeSinkException;
import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
import org.apache.iotdb.db.sync.common.persistence.SyncLogWriter;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncInfo {
    protected static final Logger LOGGER = LoggerFactory.getLogger(SyncInfo.class);
    protected Map<String, List<PipeMessage>> pipeMessageMap;
    private Map<String, PipeSink> pipeSinks;
    private PipeInfo runningPipe;
    private List<PipeInfo> pipes;
    protected SyncLogWriter syncLogWriter = SyncLogWriter.getInstance();

    public SyncInfo() {
        SyncLogReader logReader = new SyncLogReader();
        try {
            logReader.recover();
            this.pipeSinks = logReader.getAllPipeSinks();
            this.pipes = logReader.getAllPipeInfos();
            this.runningPipe = logReader.getRunningPipeInfo();
            this.pipeMessageMap = logReader.getPipeMessageMap();
        }
        catch (StartupException e) {
            LOGGER.error("Cannot recover ReceiverInfo because {}. Use default info values.", (Object)e.getMessage());
            this.pipeSinks = new ConcurrentHashMap<String, PipeSink>();
            this.pipes = new ArrayList<PipeInfo>();
            this.pipeMessageMap = new ConcurrentHashMap<String, List<PipeMessage>>();
        }
    }

    public void close() throws IOException {
        this.syncLogWriter.close();
    }

    private boolean isPipeSinkExist(String name) {
        return this.pipeSinks.containsKey(name);
    }

    public void addPipeSink(CreatePipeSinkPlan plan) throws PipeSinkException, IOException {
        if (this.isPipeSinkExist(plan.getPipeSinkName())) {
            throw new PipeSinkException("There is a pipeSink named " + plan.getPipeSinkName() + " in IoTDB, please drop it.");
        }
        PipeSink pipeSink = SyncPipeUtil.parseCreatePipeSinkPlan(plan);
        this.pipeSinks.put(pipeSink.getPipeSinkName(), pipeSink);
        this.syncLogWriter.addPipeSink(plan);
    }

    public void addPipeSink(CreatePipeSinkStatement createPipeSinkStatement) throws PipeSinkException, IOException {
        if (this.isPipeSinkExist(createPipeSinkStatement.getPipeSinkName())) {
            throw new PipeSinkException("There is a pipeSink named " + createPipeSinkStatement.getPipeSinkName() + " in IoTDB, please drop it.");
        }
        PipeSink pipeSink = SyncPipeUtil.parseCreatePipeSinkStatement(createPipeSinkStatement);
        this.pipeSinks.put(pipeSink.getPipeSinkName(), pipeSink);
        this.syncLogWriter.addPipeSink(createPipeSinkStatement);
    }

    public void dropPipeSink(String name) throws PipeSinkException, IOException {
        if (!this.isPipeSinkExist(name)) {
            throw new PipeSinkException("PipeSink " + name + " is not exist.");
        }
        if (this.runningPipe != null && this.runningPipe.getStatus() != Pipe.PipeStatus.DROP && this.runningPipe.getPipeSinkName().equals(name)) {
            throw new PipeSinkException(String.format("Can not drop pipeSink %s, because pipe %s is using it.", name, this.runningPipe.getPipeName()));
        }
        this.pipeSinks.remove(name);
        this.syncLogWriter.dropPipeSink(name);
    }

    public PipeSink getPipeSink(String name) {
        return this.pipeSinks.getOrDefault(name, null);
    }

    public List<PipeSink> getAllPipeSink() {
        ArrayList<PipeSink> allPipeSinks = new ArrayList<PipeSink>();
        for (Map.Entry<String, PipeSink> entry : this.pipeSinks.entrySet()) {
            allPipeSinks.add(entry.getValue());
        }
        return allPipeSinks;
    }

    public void addPipe(CreatePipePlan plan, long createTime) throws PipeException, IOException {
        if (this.runningPipe != null && this.runningPipe.getStatus() != Pipe.PipeStatus.DROP) {
            throw new PipeException(String.format("Pipe %s is %s, please retry after drop it.", this.runningPipe.getPipeName(), this.runningPipe.getStatus().name()));
        }
        if (!this.isPipeSinkExist(plan.getPipeSinkName())) {
            throw new PipeException(String.format("Can not find pipeSink %s.", plan.getPipeSinkName()));
        }
        PipeSink runningPipeSink = this.getPipeSink(plan.getPipeSinkName());
        this.runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipeInfo(plan, runningPipeSink, createTime);
        this.pipes.add(this.runningPipe);
        this.syncLogWriter.addPipe(plan, createTime);
    }

    public void addPipe(CreatePipeStatement createPipeStatement, long createTime) throws PipeException, IOException {
        if (this.runningPipe != null && this.runningPipe.getStatus() != Pipe.PipeStatus.DROP) {
            throw new PipeException(String.format("Pipe %s is %s, please retry after drop it.", this.runningPipe.getPipeName(), this.runningPipe.getStatus().name()));
        }
        if (!this.isPipeSinkExist(createPipeStatement.getPipeSinkName())) {
            throw new PipeException(String.format("Can not find pipeSink %s.", createPipeStatement.getPipeSinkName()));
        }
        PipeSink runningPipeSink = this.getPipeSink(createPipeStatement.getPipeSinkName());
        this.runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipeInfo(createPipeStatement, runningPipeSink, createTime);
        this.pipes.add(this.runningPipe);
        this.syncLogWriter.addPipe(createPipeStatement, createTime);
    }

    public void operatePipe(String pipeName, StatementType statementType) throws PipeException, IOException {
        this.checkIfPipeExistAndRunning(pipeName);
        switch (statementType) {
            case START_PIPE: {
                this.runningPipe.start();
                break;
            }
            case STOP_PIPE: {
                this.runningPipe.stop();
                break;
            }
            case DROP_PIPE: {
                this.runningPipe.drop();
                break;
            }
            default: {
                throw new PipeException("Unknown operatorType " + (Object)((Object)statementType));
            }
        }
        this.syncLogWriter.operatePipe(pipeName, statementType);
    }

    public List<PipeInfo> getAllPipeInfos() {
        return this.pipes;
    }

    public PipeInfo getRunningPipeInfo() {
        return this.runningPipe;
    }

    private void checkIfPipeExistAndRunning(String pipeName) throws PipeException {
        if (this.runningPipe == null || this.runningPipe.getStatus() == Pipe.PipeStatus.DROP) {
            throw new PipeException("There is no existing pipe.");
        }
        if (!this.runningPipe.getPipeName().equals(pipeName)) {
            throw new PipeException(String.format("Pipe %s is %s, please retry after drop it.", new Object[]{this.runningPipe.getPipeName(), this.runningPipe.getStatus()}));
        }
    }

    public synchronized void writePipeMessage(String pipeName, long createTime, PipeMessage message) {
        String pipeIdentifier = SyncPathUtil.getSenderPipeDir((String)pipeName, (long)createTime);
        try {
            this.syncLogWriter.writePipeMsg(pipeIdentifier, message);
        }
        catch (IOException e) {
            LOGGER.error("Can not write pipe message {} from {} to disk because {}", new Object[]{message, pipeIdentifier, e.getMessage()});
        }
        this.pipeMessageMap.computeIfAbsent(pipeIdentifier, i -> new ArrayList()).add(message);
    }

    public synchronized List<PipeMessage> getPipeMessages(String pipeName, long createTime, boolean consume) {
        List<PipeMessage> pipeMessageList = new ArrayList<PipeMessage>();
        String pipeIdentifier = SyncPathUtil.getSenderPipeDir((String)pipeName, (long)createTime);
        if (consume) {
            try {
                this.syncLogWriter.comsumePipeMsg(pipeIdentifier);
            }
            catch (IOException e) {
                LOGGER.error("Can not read pipe message about {} from disk because {}", (Object)pipeIdentifier, (Object)e.getMessage());
            }
        }
        if (this.pipeMessageMap.containsKey(pipeIdentifier)) {
            pipeMessageList = this.pipeMessageMap.get(pipeIdentifier);
            if (consume) {
                this.pipeMessageMap.remove(pipeIdentifier);
            }
        }
        return pipeMessageList;
    }

    public PipeMessage getPipeMessage(String pipeName, long createTime, boolean consume) {
        List<PipeMessage> pipeMessageList = this.getPipeMessages(pipeName, createTime, consume);
        PipeMessage message = new PipeMessage(PipeMessage.MsgType.INFO, "");
        if (!pipeMessageList.isEmpty()) {
            for (PipeMessage pipeMessage : pipeMessageList) {
                if (pipeMessage.getType().getValue() <= message.getType().getValue()) continue;
                message = pipeMessage;
            }
        }
        return message;
    }

    private void createDir(String pipeName, String remoteIp, long createTime) {
        File f = new File(SyncPathUtil.getReceiverFileDataDir((String)pipeName, (String)remoteIp, (long)createTime));
        if (!f.exists()) {
            f.mkdirs();
        }
        if (!(f = new File(SyncPathUtil.getReceiverPipeLogDir((String)pipeName, (String)remoteIp, (long)createTime))).exists()) {
            f.mkdirs();
        }
    }
}

