package org.apache.inlong.agent.core.task.file;

import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.core.task.ActionType;
import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.TaskAction;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.RocksDbImp;
import org.apache.inlong.agent.db.TaskProfileDb;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.file.Task;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/core/task/file/TaskManager.class */
public class TaskManager extends AbstractDaemon {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskManager.class);
    public static final int CONFIG_QUEUE_CAPACITY = 1;
    public static final int CORE_THREAD_SLEEP_TIME = 1000;
    public static final int CORE_THREAD_PRINT_TIME = 10000;
    private static final int ACTION_QUEUE_CAPACITY = 100000;
    private final ConcurrentHashMap<String, Task> taskMap;
    private final BlockingQueue<List<TaskProfile>> configQueue;
    private final ThreadPoolExecutor runningPool;
    private final BlockingQueue<Task> pendingTasks;
    private final int taskMaxLimit;
    private final BlockingQueue<TaskAction> actionQueue;
    private long lastPrintTime = 0;
    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
    private final Db taskBasicDb = initDb(this.agentConf.get("agent.rocks.db.path", ".localdb/task"));
    private final TaskProfileDb taskDb = new TaskProfileDb(this.taskBasicDb);
    private final Db instanceBasicDb = initDb(this.agentConf.get("agent.rocks.db.path", ".localdb/instance"));
    private final Db offsetBasicDb = initDb(this.agentConf.get("agent.rocks.db.path", ".localdb/offset"));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.inlong.agent.core.task.file.TaskManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/inlong/agent/core/task/file/TaskManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$inlong$common$enums$TaskStateEnum;

        static {
            try {
                $SwitchMap$org$apache$inlong$agent$core$task$ActionType[ActionType.FINISH.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            $SwitchMap$org$apache$inlong$common$enums$TaskStateEnum = new int[TaskStateEnum.values().length];
            try {
                $SwitchMap$org$apache$inlong$common$enums$TaskStateEnum[TaskStateEnum.NEW.ordinal()] = 1;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$inlong$common$enums$TaskStateEnum[TaskStateEnum.RUNNING.ordinal()] = 2;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$inlong$common$enums$TaskStateEnum[TaskStateEnum.FROZEN.ordinal()] = 3;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$inlong$common$enums$TaskStateEnum[TaskStateEnum.RETRY_FINISH.ordinal()] = 4;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/inlong/agent/core/task/file/TaskManager$TaskPrintStat.class */
    public class TaskPrintStat {
        public int newCount;
        public int runningCont;
        public int frozenCount;
        public int finishedCount;
        public int otherCount;

        private TaskPrintStat() {
            this.newCount = 0;
            this.runningCont = 0;
            this.frozenCount = 0;
            this.finishedCount = 0;
            this.otherCount = 0;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stat(TaskStateEnum taskStateEnum) {
            switch (AnonymousClass1.$SwitchMap$org$apache$inlong$common$enums$TaskStateEnum[taskStateEnum.ordinal()]) {
                case 1:
                    this.newCount++;
                    return;
                case 2:
                    this.runningCont++;
                    return;
                case 3:
                    this.frozenCount++;
                    return;
                case 4:
                    this.finishedCount++;
                    return;
                default:
                    this.otherCount++;
                    return;
            }
        }

        public String toString() {
            return String.format("new %d running %d frozen %d finished %d other %d", Integer.valueOf(this.newCount), Integer.valueOf(this.runningCont), Integer.valueOf(this.frozenCount), Integer.valueOf(this.finishedCount), Integer.valueOf(this.otherCount));
        }

        /* synthetic */ TaskPrintStat(TaskManager taskManager, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    public TaskManager() {
        OffsetManager.init(this.offsetBasicDb, this.instanceBasicDb);
        this.runningPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new SynchronousQueue(), (ThreadFactory) new AgentThreadFactory("task-manager-running-pool"));
        this.taskMap = new ConcurrentHashMap<>();
        this.taskMaxLimit = this.agentConf.getInt("job.number.limit", 15);
        this.pendingTasks = new LinkedBlockingQueue(this.taskMaxLimit);
        this.configQueue = new LinkedBlockingQueue(1);
        this.actionQueue = new LinkedBlockingQueue(ACTION_QUEUE_CAPACITY);
    }

    public TaskProfileDb getTaskDb() {
        return this.taskDb;
    }

    public static Db initDb(String str) {
        try {
            return new RocksDbImp(str);
        } catch (Exception e) {
            throw new UnsupportedClassVersionError(e.getMessage());
        }
    }

    public void submitTaskProfiles(List<TaskProfile> list) {
        if (list == null) {
            return;
        }
        while (this.configQueue.size() != 0) {
            this.configQueue.poll();
        }
        for (int i = 0; i < list.size(); i++) {
            LOGGER.info("submitTaskProfiles index {} total {} {}", new Object[]{Integer.valueOf(i), Integer.valueOf(list.size()), list.get(i).toJsonStr()});
        }
        this.configQueue.add(list);
    }

    public boolean submitAction(TaskAction taskAction) {
        if (taskAction == null) {
            return false;
        }
        return this.actionQueue.offer(taskAction);
    }

    private Runnable coreThread() {
        return () -> {
            Thread.currentThread().setName("task-manager-core");
            while (isRunnable()) {
                try {
                    AgentUtils.silenceSleepInMs(1000L);
                    printTaskDetail();
                    dealWithConfigQueue(this.configQueue);
                    dealWithActionQueue(this.actionQueue);
                    AuditUtils.add(30007, "", "", AgentUtils.getCurrentTime(), 1, 1L);
                } catch (Throwable th) {
                    LOGGER.error("exception caught", th);
                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
                }
            }
        };
    }

    private void printTaskDetail() {
        if (AgentUtils.getCurrentTime() - this.lastPrintTime > 10000) {
            List tasks = this.taskDb.getTasks();
            TaskPrintStat taskPrintStat = new TaskPrintStat(this, null);
            for (int i = 0; i < tasks.size(); i++) {
                taskPrintStat.stat(((TaskProfile) tasks.get(i)).getState());
            }
            LOGGER.info("taskManager running! mem {} db total {} {} ", new Object[]{Integer.valueOf(this.taskMap.size()), Integer.valueOf(tasks.size()), taskPrintStat});
            this.lastPrintTime = AgentUtils.getCurrentTime();
        }
    }

    private void dealWithConfigQueue(BlockingQueue<List<TaskProfile>> blockingQueue) {
        List<TaskProfile> poll = blockingQueue.poll();
        if (poll == null) {
            return;
        }
        keepPaceWithManager(poll);
        keepPaceWithDb();
    }

    private void dealWithActionQueue(BlockingQueue<TaskAction> blockingQueue) {
        TaskAction poll;
        while (isRunnable()) {
            try {
                poll = blockingQueue.poll();
            } catch (Throwable th) {
                LOGGER.error("dealWithActionQueue", th);
                ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
            }
            if (poll != null) {
                TaskProfile profile = poll.getProfile();
                switch (poll.getActionType()) {
                    case FINISH:
                        LOGGER.info("deal finish action, taskId {}", profile.getTaskId());
                        finishTask(profile);
                        break;
                    default:
                        LOGGER.error("invalid action type for action queue: taskId {} type {}", profile.getTaskId(), poll.getActionType());
                        break;
                }
            } else {
                return;
            }
        }
    }

    private void keepPaceWithManager(List<TaskProfile> list) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        list.forEach(taskProfile -> {
            TaskStateEnum state = taskProfile.getState();
            if (state == TaskStateEnum.RUNNING || state == TaskStateEnum.FROZEN) {
                concurrentHashMap.put(taskProfile.getTaskId(), taskProfile);
            } else {
                LOGGER.error("task {} invalid task state {}", taskProfile, state);
            }
        });
        traverseManagerTasksToDb(concurrentHashMap);
        traverseDbTasksToManager(concurrentHashMap);
    }

    private void keepPaceWithDb() {
        traverseDbTasksToMemory();
        traverseMemoryTasksToDb();
    }

    private void traverseManagerTasksToDb(Map<String, TaskProfile> map) {
        map.values().forEach(taskProfile -> {
            TaskProfile task = this.taskDb.getTask(taskProfile.getTaskId());
            if (task == null) {
                LOGGER.info("traverseManagerTasksToDb task {} not found in db retry {} state {}, add it", new Object[]{taskProfile.getTaskId(), Boolean.valueOf(taskProfile.isRetry()), taskProfile.getState()});
                addTask(taskProfile);
                return;
            }
            TaskStateEnum state = taskProfile.getState();
            TaskStateEnum state2 = task.getState();
            if (state == state2) {
                return;
            }
            if (state2 == TaskStateEnum.RETRY_FINISH) {
                LOGGER.info("traverseManagerTasksToDb task {} dbState {} retry {}, do nothing", new Object[]{task.getTaskId(), state2, Boolean.valueOf(task.isRetry())});
            } else if (state == TaskStateEnum.RUNNING) {
                LOGGER.info("traverseManagerTasksToDb task {} dbState {} retry {}, active it", new Object[]{task.getTaskId(), state2, Boolean.valueOf(task.isRetry())});
                activeTask(taskProfile);
            } else {
                LOGGER.info("traverseManagerTasksToDb task {} dbState {} retry {}, freeze it", new Object[]{task.getTaskId(), state2, Boolean.valueOf(task.isRetry())});
                freezeTask(taskProfile);
            }
        });
    }

    private void traverseDbTasksToManager(Map<String, TaskProfile> map) {
        this.taskDb.getTasks().forEach(taskProfile -> {
            if (map.containsKey(taskProfile.getTaskId())) {
                return;
            }
            LOGGER.info("traverseDbTasksToManager try to delete task {}", taskProfile.getTaskId());
            deleteTask(taskProfile);
        });
    }

    private void traverseDbTasksToMemory() {
        this.taskDb.getTasks().forEach(taskProfile -> {
            TaskStateEnum state = taskProfile.getState();
            Task task = this.taskMap.get(taskProfile.getTaskId());
            if (state == TaskStateEnum.RUNNING) {
                if (task == null) {
                    LOGGER.info("traverseDbTasksToMemory add task to mem taskId {}", taskProfile.getTaskId());
                    addToMemory(taskProfile);
                    return;
                }
                return;
            }
            if (state != TaskStateEnum.FROZEN) {
                if (state != TaskStateEnum.RETRY_FINISH) {
                    LOGGER.error("task {} invalid state {}", taskProfile.getTaskId(), state);
                }
            } else if (task != null) {
                LOGGER.info("traverseDbTasksToMemory delete task from mem taskId {}", taskProfile.getTaskId());
                deleteFromMemory(taskProfile.getTaskId());
            }
        });
    }

    private void traverseMemoryTasksToDb() {
        this.taskMap.values().forEach(task -> {
            TaskProfile task = this.taskDb.getTask(task.getTaskId());
            if (task == null) {
                deleteFromMemory(task.getTaskId());
            } else if (task.getState() != TaskStateEnum.RUNNING) {
                deleteFromMemory(task.getTaskId());
            }
        });
    }

    private void addTask(TaskProfile taskProfile) {
        if (this.taskMap.size() >= this.taskMaxLimit) {
            LOGGER.error("taskMap size {} over limit {}", Integer.valueOf(this.taskMap.size()), Integer.valueOf(this.taskMaxLimit));
            return;
        }
        if (!isProfileValid(taskProfile)) {
            LOGGER.error("task profile invalid {}", taskProfile.toJsonStr());
            return;
        }
        addToDb(taskProfile);
        if (TaskStateEnum.getTaskState(taskProfile.getInt("task.state")) == TaskStateEnum.RUNNING) {
            addToMemory(taskProfile);
        } else {
            LOGGER.info("taskId {} state {} no need to add to memory", taskProfile.getTaskId(), taskProfile.getState());
        }
    }

    private void deleteTask(TaskProfile taskProfile) {
        deleteFromDb(taskProfile);
        deleteFromMemory(taskProfile.getTaskId());
    }

    private void freezeTask(TaskProfile taskProfile) {
        updateToDb(taskProfile);
        deleteFromMemory(taskProfile.getTaskId());
    }

    private void finishTask(TaskProfile taskProfile) {
        taskProfile.setState(TaskStateEnum.RETRY_FINISH);
        updateToDb(taskProfile);
        deleteFromMemory(taskProfile.getTaskId());
    }

    private void activeTask(TaskProfile taskProfile) {
        updateToDb(taskProfile);
        addToMemory(taskProfile);
    }

    private void restoreFromDb() {
        this.taskDb.getTasks().forEach(taskProfile -> {
            if (taskProfile.getState() == TaskStateEnum.RUNNING) {
                LOGGER.info("restoreFromDb taskId {}", taskProfile.getTaskId());
                addToMemory(taskProfile);
            }
        });
    }

    private void stopAllTasks() {
        this.taskMap.values().forEach(task -> {
            task.destroy();
        });
        this.taskMap.clear();
    }

    private boolean isProfileValid(TaskProfile taskProfile) {
        try {
            return ((Task) Class.forName(taskProfile.getTaskClass()).newInstance()).isProfileValid(taskProfile);
        } catch (Throwable th) {
            LOGGER.error("isProfileValid error: ", th);
            return false;
        }
    }

    private void addToDb(TaskProfile taskProfile) {
        if (this.taskDb.getTask(taskProfile.getTaskId()) != null) {
            LOGGER.error("task {} should not exist", taskProfile.getTaskId());
        }
        this.taskDb.storeTask(taskProfile);
    }

    private void deleteFromDb(TaskProfile taskProfile) {
        if (this.taskDb.getTask(taskProfile.getTaskId()) == null) {
            LOGGER.error("try to delete task {} but not found in db", taskProfile);
        } else {
            this.taskDb.deleteTask(taskProfile.getTaskId());
        }
    }

    private void updateToDb(TaskProfile taskProfile) {
        if (this.taskDb.getTask(taskProfile.getTaskId()) == null) {
            LOGGER.error("task {} not found, agent may have been reinstalled", taskProfile);
        }
        this.taskDb.storeTask(taskProfile);
    }

    private void addToMemory(TaskProfile taskProfile) {
        Task task = this.taskMap.get(taskProfile.getTaskId());
        if (task != null) {
            task.destroy();
            this.taskMap.remove(taskProfile.getTaskId());
            LOGGER.error("old task {} should not exist, try stop it first", taskProfile.getTaskId());
        }
        try {
            Runnable runnable = (Task) Class.forName(taskProfile.getTaskClass()).newInstance();
            runnable.init(this, taskProfile, this.instanceBasicDb);
            this.taskMap.put(taskProfile.getTaskId(), runnable);
            this.runningPool.submit(runnable);
            LOGGER.info("add task {} into memory, taskMap size {}, runningPool task total {}, runningPool task active {}", new Object[]{runnable.getTaskId(), Integer.valueOf(this.taskMap.size()), Long.valueOf(this.runningPool.getTaskCount()), Integer.valueOf(this.runningPool.getActiveCount())});
        } catch (Throwable th) {
            LOGGER.error("add task error: ", th);
        }
    }

    private void deleteFromMemory(String str) {
        Task task = this.taskMap.get(str);
        if (task == null) {
            LOGGER.error("old task {} not found", str);
            return;
        }
        task.destroy();
        this.taskMap.remove(task.getTaskId());
        LOGGER.info("delete task {} from memory, taskMap size {}, runningPool task total {}, runningPool task active {}", new Object[]{task.getTaskId(), Integer.valueOf(this.taskMap.size()), Long.valueOf(this.runningPool.getTaskCount()), Integer.valueOf(this.runningPool.getActiveCount())});
    }

    public Task getTask(String str) {
        return this.taskMap.get(str);
    }

    public TaskProfile getTaskProfile(String str) {
        return this.taskDb.getTask(str);
    }

    public void start() throws Exception {
        restoreFromDb();
        submitWorker(coreThread());
        OffsetManager.getInstance().start();
    }

    public void stop() throws Exception {
        stopAllTasks();
        waitForTerminate();
        this.runningPool.shutdown();
    }
}
