package org.apache.iotdb.commons.pipe.execution.executor;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.task.subtask.PipeSubtask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/commons/pipe/execution/executor/PipeSubtaskExecutor.class */
public abstract class PipeSubtaskExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtaskExecutor.class);
    private static final ExecutorService subtaskCallbackListeningExecutor = IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName());
    private final ListeningExecutorService subtaskWorkerThreadPoolExecutor;
    private final int corePoolSize;
    private final Map<String, PipeSubtask> registeredIdSubtaskMapper = new ConcurrentHashMap();
    private int runningSubtaskNumber = 0;

    protected PipeSubtaskExecutor(int i, ThreadName threadName) {
        this.subtaskWorkerThreadPoolExecutor = MoreExecutors.listeningDecorator(IoTDBThreadPoolFactory.newFixedThreadPool(i, threadName.getName()));
        this.corePoolSize = i;
    }

    public final synchronized void register(PipeSubtask pipeSubtask) {
        if (this.registeredIdSubtaskMapper.containsKey(pipeSubtask.getTaskID())) {
            LOGGER.warn("The subtask {} is already registered.", pipeSubtask.getTaskID());
        } else {
            this.registeredIdSubtaskMapper.put(pipeSubtask.getTaskID(), pipeSubtask);
            pipeSubtask.bindExecutors(this.subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor, new PipeSubtaskScheduler(this));
        }
    }

    public final synchronized void start(String str) {
        if (!this.registeredIdSubtaskMapper.containsKey(str)) {
            LOGGER.warn("The subtask {} is not registered.", str);
            return;
        }
        PipeSubtask pipeSubtask = this.registeredIdSubtaskMapper.get(str);
        if (pipeSubtask.isSubmittingSelf()) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("The subtask {} is already running.", str);
            }
        } else {
            pipeSubtask.allowSubmittingSelf();
            pipeSubtask.submitSelf();
            this.runningSubtaskNumber++;
            LOGGER.info("The subtask {} is started to submit self.", str);
        }
    }

    public final synchronized void stop(String str) {
        if (!this.registeredIdSubtaskMapper.containsKey(str)) {
            LOGGER.warn("The subtask {} is not registered.", str);
        } else if (this.registeredIdSubtaskMapper.get(str).disallowSubmittingSelf()) {
            this.runningSubtaskNumber--;
        }
    }

    public final synchronized void deregister(String str) {
        stop(str);
        PipeSubtask remove = this.registeredIdSubtaskMapper.remove(str);
        if (remove != null) {
            try {
                remove.close();
                LOGGER.info("The subtask {} is closed successfully.", str);
            } catch (Exception e) {
                LOGGER.error("Failed to close the subtask {}.", str, e);
            }
        }
    }

    public final boolean isRegistered(String str) {
        return this.registeredIdSubtaskMapper.containsKey(str);
    }

    public final int getRegisteredSubtaskNumber() {
        return this.registeredIdSubtaskMapper.size();
    }

    public final synchronized void shutdown() {
        if (isShutdown()) {
            return;
        }
        Iterator<PipeSubtask> it = this.registeredIdSubtaskMapper.values().iterator();
        while (it.hasNext()) {
            it.next().disallowSubmittingSelf();
        }
        this.subtaskWorkerThreadPoolExecutor.shutdown();
    }

    public final boolean isShutdown() {
        return this.subtaskWorkerThreadPoolExecutor.isShutdown();
    }

    public final int getCorePoolSize() {
        return this.corePoolSize;
    }

    public final int getRunningSubtaskNumber() {
        return this.runningSubtaskNumber;
    }
}
