package org.apache.iotdb.commons.pipe.agent.task;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.task.PipeTask;
import org.apache.iotdb.commons.pipe.task.PipeTaskManager;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.class */
public abstract class PipeTaskAgent {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskAgent.class);
    protected static final String MESSAGE_UNKNOWN_PIPE_STATUS = "Unknown pipe status %s for pipe %s";
    protected static final String MESSAGE_UNEXPECTED_PIPE_STATUS = "Unexpected pipe status %s: ";
    protected final PipeMetaKeeper pipeMetaKeeper = new PipeMetaKeeper();
    protected final PipeTaskManager pipeTaskManager = new PipeTaskManager();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$commons$pipe$task$meta$PipeStatus = new int[PipeStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$iotdb$commons$pipe$task$meta$PipeStatus[PipeStatus.RUNNING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$iotdb$commons$pipe$task$meta$PipeStatus[PipeStatus.STOPPED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$iotdb$commons$pipe$task$meta$PipeStatus[PipeStatus.DROPPED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    protected void acquireReadLock() {
        this.pipeMetaKeeper.acquireReadLock();
    }

    protected boolean tryReadLockWithTimeOut(long j) {
        try {
            return this.pipeMetaKeeper.tryReadLock(j);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.warn("Interruption during requiring pipeMetaKeeper read lock.", e);
            return false;
        }
    }

    protected void releaseReadLock() {
        this.pipeMetaKeeper.releaseReadLock();
    }

    protected void acquireWriteLock() {
        this.pipeMetaKeeper.acquireWriteLock();
    }

    protected boolean tryWriteLockWithTimeOut(long j) {
        try {
            return this.pipeMetaKeeper.tryWriteLock(j);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.warn("Interruption during requiring pipeMetaKeeper write lock.", e);
            return false;
        }
    }

    protected void releaseWriteLock() {
        this.pipeMetaKeeper.releaseWriteLock();
    }

    public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges(PipeMeta pipeMeta) {
        acquireWriteLock();
        try {
            return handleSinglePipeMetaChangesInternal(pipeMeta);
        } finally {
            releaseWriteLock();
        }
    }

    protected TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChangesInternal(PipeMeta pipeMeta) {
        if (isShutdown()) {
            return null;
        }
        try {
            executeSinglePipeMetaChanges(pipeMeta);
            return null;
        } catch (Exception e) {
            String pipeName = pipeMeta.getStaticMeta().getPipeName();
            String format = String.format("Failed to handle single pipe meta changes for %s, because %s", pipeName, e.getMessage());
            LOGGER.warn("Failed to handle single pipe meta changes for {}", pipeName, e);
            return new TPushPipeMetaRespExceptionMessage(pipeName, format, System.currentTimeMillis());
        }
    }

    protected abstract boolean isShutdown();

    private void executeSinglePipeMetaChanges(PipeMeta pipeMeta) {
        String pipeName = pipeMeta.getStaticMeta().getPipeName();
        PipeMeta pipeMeta2 = this.pipeMetaKeeper.getPipeMeta(pipeName);
        if (pipeMeta2 == null) {
            if (createPipe(pipeMeta)) {
                startPipe(pipeName, pipeMeta.getStaticMeta().getCreationTime());
                return;
            }
            return;
        }
        PipeStaticMeta staticMeta = pipeMeta2.getStaticMeta();
        PipeStaticMeta staticMeta2 = pipeMeta.getStaticMeta();
        if (staticMeta.equals(staticMeta2)) {
            executeSinglePipeRuntimeMetaChanges(staticMeta2, pipeMeta.getRuntimeMeta(), pipeMeta2.getRuntimeMeta());
        } else {
            dropPipe(pipeName);
            if (createPipe(pipeMeta)) {
                startPipe(pipeName, pipeMeta.getStaticMeta().getCreationTime());
            }
        }
    }

    private void executeSinglePipeRuntimeMetaChanges(PipeStaticMeta pipeStaticMeta, PipeRuntimeMeta pipeRuntimeMeta, PipeRuntimeMeta pipeRuntimeMeta2) {
        Map<TConsensusGroupId, PipeTaskMeta> consensusGroupId2TaskMetaMap = pipeRuntimeMeta.getConsensusGroupId2TaskMetaMap();
        Map<TConsensusGroupId, PipeTaskMeta> consensusGroupId2TaskMetaMap2 = pipeRuntimeMeta2.getConsensusGroupId2TaskMetaMap();
        for (Map.Entry<TConsensusGroupId, PipeTaskMeta> entry : consensusGroupId2TaskMetaMap.entrySet()) {
            TConsensusGroupId key = entry.getKey();
            PipeTaskMeta value = entry.getValue();
            PipeTaskMeta pipeTaskMeta = consensusGroupId2TaskMetaMap2.get(key);
            if (pipeTaskMeta == null) {
                createPipeTask(key, pipeStaticMeta, value);
                if (pipeRuntimeMeta2.getStatus().get() == PipeStatus.RUNNING) {
                    startPipeTask(key, pipeStaticMeta);
                }
            } else if (value.getLeaderDataNodeId() != pipeTaskMeta.getLeaderDataNodeId()) {
                dropPipeTask(key, pipeStaticMeta);
                createPipeTask(key, pipeStaticMeta, value);
                if (pipeRuntimeMeta2.getStatus().get() == PipeStatus.RUNNING) {
                    startPipeTask(key, pipeStaticMeta);
                }
            }
        }
        Iterator<Map.Entry<TConsensusGroupId, PipeTaskMeta>> it = consensusGroupId2TaskMetaMap2.entrySet().iterator();
        while (it.hasNext()) {
            TConsensusGroupId key2 = it.next().getKey();
            if (consensusGroupId2TaskMetaMap.get(key2) == null) {
                dropPipeTask(key2, pipeStaticMeta);
            }
        }
        PipeStatus pipeStatus = pipeRuntimeMeta.getStatus().get();
        PipeStatus pipeStatus2 = pipeRuntimeMeta2.getStatus().get();
        if (pipeStatus == pipeStatus2) {
            return;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$commons$pipe$task$meta$PipeStatus[pipeStatus.ordinal()]) {
            case 1:
                if (Objects.requireNonNull(pipeStatus2) != PipeStatus.STOPPED) {
                    throw new IllegalStateException(String.format(MESSAGE_UNKNOWN_PIPE_STATUS, pipeStatus2, pipeStaticMeta.getPipeName()));
                }
                startPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
                return;
            case SchemaConstant.MEASUREMENT_MNODE_TYPE /* 2 */:
                if (Objects.requireNonNull(pipeStatus2) != PipeStatus.RUNNING) {
                    throw new IllegalStateException(String.format(MESSAGE_UNKNOWN_PIPE_STATUS, pipeStatus2, pipeStaticMeta.getPipeName()));
                }
                stopPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
                return;
            case SchemaConstant.ENTITY_MNODE_TYPE /* 3 */:
                dropPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
                return;
            default:
                throw new IllegalStateException(String.format(MESSAGE_UNKNOWN_PIPE_STATUS, pipeStatus, pipeStaticMeta.getPipeName()));
        }
    }

    public TPushPipeMetaRespExceptionMessage handleDropPipe(String str) {
        acquireWriteLock();
        try {
            return handleDropPipeInternal(str);
        } finally {
            releaseWriteLock();
        }
    }

    protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String str) {
        if (isShutdown()) {
            return null;
        }
        try {
            dropPipe(str);
            return null;
        } catch (Exception e) {
            String format = String.format("Failed to drop pipe %s, because %s", str, e.getMessage());
            LOGGER.warn("Failed to drop pipe {}", str, e);
            return new TPushPipeMetaRespExceptionMessage(str, format, System.currentTimeMillis());
        }
    }

    public List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChanges(List<PipeMeta> list) {
        acquireWriteLock();
        try {
            return handlePipeMetaChangesInternal(list);
        } finally {
            releaseWriteLock();
        }
    }

    private List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal(List<PipeMeta> list) {
        if (isShutdown()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        for (PipeMeta pipeMeta : list) {
            try {
                executeSinglePipeMetaChanges(pipeMeta);
            } catch (Exception e) {
                String pipeName = pipeMeta.getStaticMeta().getPipeName();
                String format = String.format("Failed to handle pipe meta changes for %s, because %s", pipeName, e.getMessage());
                LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
                arrayList.add(new TPushPipeMetaRespExceptionMessage(pipeName, format, System.currentTimeMillis()));
            }
        }
        Set set = (Set) list.stream().map(pipeMeta2 -> {
            return pipeMeta2.getStaticMeta().getPipeName();
        }).collect(Collectors.toSet());
        for (PipeMeta pipeMeta3 : this.pipeMetaKeeper.getPipeMetaList()) {
            String pipeName2 = pipeMeta3.getStaticMeta().getPipeName();
            try {
                if (!set.contains(pipeName2)) {
                    dropPipe(pipeMeta3.getStaticMeta().getPipeName());
                }
            } catch (Exception e2) {
                String format2 = String.format("Failed to handle pipe meta changes for %s, because %s", pipeName2, e2.getMessage());
                LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName2, e2);
                arrayList.add(new TPushPipeMetaRespExceptionMessage(pipeName2, format2, System.currentTimeMillis()));
            }
        }
        return arrayList;
    }

    public void dropAllPipeTasks() {
        acquireWriteLock();
        try {
            dropAllPipeTasksInternal();
        } finally {
            releaseWriteLock();
        }
    }

    private void dropAllPipeTasksInternal() {
        for (PipeMeta pipeMeta : this.pipeMetaKeeper.getPipeMetaList()) {
            try {
                dropPipe(pipeMeta.getStaticMeta().getPipeName(), pipeMeta.getStaticMeta().getCreationTime());
            } catch (Exception e) {
                LOGGER.warn("Failed to drop pipe {} with creation time {}", new Object[]{pipeMeta.getStaticMeta().getPipeName(), Long.valueOf(pipeMeta.getStaticMeta().getCreationTime()), e});
            }
        }
    }

    private boolean createPipe(PipeMeta pipeMeta) {
        String pipeName = pipeMeta.getStaticMeta().getPipeName();
        long creationTime = pipeMeta.getStaticMeta().getCreationTime();
        PipeMeta pipeMeta2 = this.pipeMetaKeeper.getPipeMeta(pipeName);
        if (pipeMeta2 != null) {
            if (!checkBeforeCreatePipe(pipeMeta2, pipeName, creationTime)) {
                return false;
            }
            dropPipe(pipeName, pipeMeta2.getStaticMeta().getCreationTime());
        }
        Map<TConsensusGroupId, PipeTask> buildPipeTasks = buildPipeTasks(pipeMeta);
        long currentTimeMillis = System.currentTimeMillis();
        buildPipeTasks.values().parallelStream().forEach((v0) -> {
            v0.create();
        });
        LOGGER.info("Create all pipe tasks on Pipe {} successfully within {} ms", pipeName, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        this.pipeTaskManager.addPipeTasks(pipeMeta.getStaticMeta(), buildPipeTasks);
        AtomicReference<PipeStatus> status = pipeMeta.getRuntimeMeta().getStatus();
        boolean z = status.get() == PipeStatus.RUNNING;
        status.set(PipeStatus.STOPPED);
        this.pipeMetaKeeper.addPipeMeta(pipeName, pipeMeta);
        return z;
    }

    protected abstract Map<TConsensusGroupId, PipeTask> buildPipeTasks(PipeMeta pipeMeta);

    private void dropPipe(String str, long j) {
        PipeMeta pipeMeta = this.pipeMetaKeeper.getPipeMeta(str);
        if (checkBeforeDropPipe(pipeMeta, str, j)) {
            pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
            Map<TConsensusGroupId, PipeTask> removePipeTasks = this.pipeTaskManager.removePipeTasks(pipeMeta.getStaticMeta());
            if (removePipeTasks == null) {
                LOGGER.info("Pipe {} (creation time = {}) has already been dropped or has not been created. Skip dropping.", str, Long.valueOf(j));
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            removePipeTasks.values().parallelStream().forEach((v0) -> {
                v0.drop();
            });
            LOGGER.info("Drop all pipe tasks on Pipe {} successfully within {} ms", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            this.pipeMetaKeeper.removePipeMeta(str);
        }
    }

    private void dropPipe(String str) {
        PipeMeta pipeMeta = this.pipeMetaKeeper.getPipeMeta(str);
        if (checkBeforeDropPipe(pipeMeta, str)) {
            pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
            Map<TConsensusGroupId, PipeTask> removePipeTasks = this.pipeTaskManager.removePipeTasks(pipeMeta.getStaticMeta());
            if (removePipeTasks == null) {
                LOGGER.info("Pipe {} has already been dropped or has not been created. Skip dropping.", str);
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            removePipeTasks.values().parallelStream().forEach((v0) -> {
                v0.drop();
            });
            LOGGER.info("Drop all pipe tasks on Pipe {} successfully within {} ms", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            this.pipeMetaKeeper.removePipeMeta(str);
        }
    }

    private void startPipe(String str, long j) {
        PipeMeta pipeMeta = this.pipeMetaKeeper.getPipeMeta(str);
        if (checkBeforeStartPipe(pipeMeta, str, j)) {
            Map<TConsensusGroupId, PipeTask> pipeTasks = this.pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta());
            if (pipeTasks == null) {
                LOGGER.info("Pipe {} (creation time = {}) has already been dropped or has not been created. Skip starting.", str, Long.valueOf(j));
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            pipeTasks.values().parallelStream().forEach((v0) -> {
                v0.start();
            });
            LOGGER.info("Start all pipe tasks on Pipe {} successfully within {} ms", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
            pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().values().forEach((v0) -> {
                v0.clearExceptionMessages();
            });
        }
    }

    protected void stopPipe(String str, long j) {
        PipeMeta pipeMeta = this.pipeMetaKeeper.getPipeMeta(str);
        if (checkBeforeStopPipe(pipeMeta, str, j)) {
            Map<TConsensusGroupId, PipeTask> pipeTasks = this.pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta());
            if (pipeTasks == null) {
                LOGGER.info("Pipe {} (creation time = {}) has already been dropped or has not been created. Skip stopping.", str, Long.valueOf(j));
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            pipeTasks.values().parallelStream().forEach((v0) -> {
                v0.stop();
            });
            LOGGER.info("Stop all pipe tasks on Pipe {} successfully within {} ms", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
        }
    }

    protected boolean checkBeforeCreatePipe(PipeMeta pipeMeta, String str, long j) throws IllegalStateException {
        if (pipeMeta.getStaticMeta().getCreationTime() != j) {
            return true;
        }
        PipeStatus pipeStatus = pipeMeta.getRuntimeMeta().getStatus().get();
        switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$commons$pipe$task$meta$PipeStatus[pipeStatus.ordinal()]) {
            case 1:
            case SchemaConstant.MEASUREMENT_MNODE_TYPE /* 2 */:
                if (!LOGGER.isInfoEnabled()) {
                    return false;
                }
                LOGGER.info("Pipe {} (creation time = {}) has already been created. Current status = {}. Skip creating.", new Object[]{str, Long.valueOf(j), pipeStatus.name()});
                return false;
            case SchemaConstant.ENTITY_MNODE_TYPE /* 3 */:
                if (!LOGGER.isInfoEnabled()) {
                    return true;
                }
                LOGGER.info("Pipe {} (creation time = {}) has already been dropped, but the pipe task meta has not been cleaned up. Current status = {}. Try dropping the pipe and recreating it.", new Object[]{str, Long.valueOf(j), pipeStatus.name()});
                return true;
            default:
                throw new IllegalStateException(MESSAGE_UNEXPECTED_PIPE_STATUS + pipeMeta.getRuntimeMeta().getStatus().get().name());
        }
    }

    protected boolean checkBeforeStartPipe(PipeMeta pipeMeta, String str, long j) throws IllegalStateException {
        if (pipeMeta == null) {
            LOGGER.info("Pipe {} (creation time = {}) has already been dropped or has not been created. Skip starting.", str, Long.valueOf(j));
            return false;
        }
        if (pipeMeta.getStaticMeta().getCreationTime() != j) {
            LOGGER.info("Pipe {} (creation time = {}) has been created but does not match the creation time ({}) in startPipe request. Skip starting.", new Object[]{str, Long.valueOf(pipeMeta.getStaticMeta().getCreationTime()), Long.valueOf(j)});
            return false;
        }
        PipeStatus pipeStatus = pipeMeta.getRuntimeMeta().getStatus().get();
        switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$commons$pipe$task$meta$PipeStatus[pipeStatus.ordinal()]) {
            case 1:
                if (!LOGGER.isInfoEnabled()) {
                    return false;
                }
                LOGGER.info("Pipe {} (creation time = {}) has already been started. Current status = {}. Skip starting.", new Object[]{str, Long.valueOf(j), pipeStatus.name()});
                return false;
            case SchemaConstant.MEASUREMENT_MNODE_TYPE /* 2 */:
                if (!LOGGER.isInfoEnabled()) {
                    return true;
                }
                LOGGER.info("Pipe {} (creation time = {}) has been created. Current status = {}. Starting.", new Object[]{str, Long.valueOf(j), pipeStatus.name()});
                return true;
            case SchemaConstant.ENTITY_MNODE_TYPE /* 3 */:
                if (!LOGGER.isInfoEnabled()) {
                    return false;
                }
                LOGGER.info("Pipe {} (creation time = {}) has already been dropped. Current status = {}. Skip starting.", new Object[]{str, Long.valueOf(j), pipeStatus.name()});
                return false;
            default:
                throw new IllegalStateException(MESSAGE_UNEXPECTED_PIPE_STATUS + pipeMeta.getRuntimeMeta().getStatus().get().name());
        }
    }

    protected boolean checkBeforeStopPipe(PipeMeta pipeMeta, String str, long j) throws IllegalStateException {
        if (pipeMeta == null) {
            LOGGER.info("Pipe {} (creation time = {}) has already been dropped or has not been created. Skip stopping.", str, Long.valueOf(j));
            return false;
        }
        if (pipeMeta.getStaticMeta().getCreationTime() != j) {
            LOGGER.info("Pipe {} (creation time = {}) has been created but does not match the creation time ({}) in stopPipe request. Skip stopping.", new Object[]{str, Long.valueOf(pipeMeta.getStaticMeta().getCreationTime()), Long.valueOf(j)});
            return false;
        }
        PipeStatus pipeStatus = pipeMeta.getRuntimeMeta().getStatus().get();
        switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$commons$pipe$task$meta$PipeStatus[pipeStatus.ordinal()]) {
            case 1:
                if (!LOGGER.isInfoEnabled()) {
                    return true;
                }
                LOGGER.info("Pipe {} (creation time = {}) has been started. Current status = {}. Stopping.", new Object[]{str, Long.valueOf(j), pipeStatus.name()});
                return true;
            case SchemaConstant.MEASUREMENT_MNODE_TYPE /* 2 */:
                if (!LOGGER.isInfoEnabled()) {
                    return false;
                }
                LOGGER.info("Pipe {} (creation time = {}) has already been stopped. Current status = {}. Skip stopping.", new Object[]{str, Long.valueOf(j), pipeStatus.name()});
                return false;
            case SchemaConstant.ENTITY_MNODE_TYPE /* 3 */:
                if (!LOGGER.isInfoEnabled()) {
                    return false;
                }
                LOGGER.info("Pipe {} (creation time = {}) has already been dropped. Current status = {}. Skip stopping.", new Object[]{str, Long.valueOf(j), pipeStatus.name()});
                return false;
            default:
                throw new IllegalStateException(MESSAGE_UNEXPECTED_PIPE_STATUS + pipeStatus.name());
        }
    }

    protected boolean checkBeforeDropPipe(PipeMeta pipeMeta, String str, long j) throws IllegalStateException {
        if (pipeMeta == null) {
            LOGGER.info("Pipe {} (creation time = {}) has already been dropped or has not been created. Skip dropping.", str, Long.valueOf(j));
            return false;
        }
        if (pipeMeta.getStaticMeta().getCreationTime() == j) {
            return true;
        }
        LOGGER.info("Pipe {} (creation time = {}) has been created but does not match the creation time ({}) in dropPipe request. Skip dropping.", new Object[]{str, Long.valueOf(pipeMeta.getStaticMeta().getCreationTime()), Long.valueOf(j)});
        return false;
    }

    protected boolean checkBeforeDropPipe(PipeMeta pipeMeta, String str) throws IllegalStateException {
        if (pipeMeta != null) {
            return true;
        }
        LOGGER.info("Pipe {} has already been dropped or has not been created. Skip dropping.", str);
        return false;
    }

    protected abstract void createPipeTask(TConsensusGroupId tConsensusGroupId, PipeStaticMeta pipeStaticMeta, PipeTaskMeta pipeTaskMeta);

    private void dropPipeTask(TConsensusGroupId tConsensusGroupId, PipeStaticMeta pipeStaticMeta) {
        this.pipeMetaKeeper.getPipeMeta(pipeStaticMeta.getPipeName()).getRuntimeMeta().getConsensusGroupId2TaskMetaMap().remove(tConsensusGroupId);
        PipeTask removePipeTask = this.pipeTaskManager.removePipeTask(pipeStaticMeta, tConsensusGroupId);
        if (removePipeTask != null) {
            removePipeTask.drop();
        }
    }

    private void startPipeTask(TConsensusGroupId tConsensusGroupId, PipeStaticMeta pipeStaticMeta) {
        PipeTask pipeTask = this.pipeTaskManager.getPipeTask(pipeStaticMeta, tConsensusGroupId);
        if (pipeTask != null) {
            pipeTask.start();
        }
    }
}
