package org.apache.inlong.agent.core.job;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.core.task.Task;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.db.CommandDb;
import org.apache.inlong.agent.state.AbstractStateWrapper;
import org.apache.inlong.agent.state.State;
import org.apache.inlong.common.db.CommandEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/core/job/JobWrapper.class */
public class JobWrapper extends AbstractStateWrapper {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobWrapper.class);
    private final TaskManager taskManager;
    private final JobManager jobManager;
    private final Job job;
    private CommandDb db;
    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
    private final List<Task> allTasks = new CopyOnWriteArrayList();

    public JobWrapper(AgentManager agentManager, Job job) {
        this.taskManager = agentManager.getTaskManager();
        this.jobManager = agentManager.getJobManager();
        this.job = job;
        this.db = agentManager.getCommandDb();
        doChangeState(State.ACCEPTED);
    }

    private void checkAllTasksStateAndWait() throws Exception {
        boolean allMatch;
        long j = this.agentConf.getLong("job.finish.checkInterval", 6L);
        do {
            allMatch = this.allTasks.stream().allMatch(task -> {
                return this.taskManager.isTaskFinished(task.getTaskId());
            });
            TimeUnit.SECONDS.sleep(j);
        } while (!allMatch);
        LOGGER.info("all tasks of {} has been checked", this.job.getJobInstanceId());
        if (this.allTasks.stream().allMatch(task2 -> {
            return this.taskManager.isTaskSuccess(task2.getTaskId());
        })) {
            doChangeState(State.SUCCEEDED);
        } else {
            doChangeState(State.FAILED);
            saveFailedCommand();
        }
    }

    private void saveFailedCommand() {
        CommandEntity commandEntity = new CommandEntity();
        commandEntity.setId(this.job.getJobInstanceId());
        commandEntity.setAcked(false);
        commandEntity.setTaskId(Integer.valueOf(this.job.getJobInstanceId()));
        commandEntity.setCommandResult(1);
        commandEntity.setVersion(Integer.valueOf(this.job.getJobConf().getInt("job.version", AgentConstants.DEFAULT_JOB_VERSION.intValue())));
        this.db.storeCommand(commandEntity);
    }

    private void submitAllTasks() {
        List<Task> createTasks = this.job.createTasks();
        LOGGER.info("job name is {} and task size {}", this.job.getName(), Integer.valueOf(createTasks.size()));
        createTasks.forEach(task -> {
            this.allTasks.add(task);
            this.taskManager.submitTask(task);
        });
    }

    public String getSnapshot() {
        ArrayList arrayList = new ArrayList();
        Iterator<Task> it = this.allTasks.iterator();
        while (it.hasNext()) {
            String snapshot = it.next().getReader().getSnapshot();
            if (snapshot != null) {
                arrayList.add(snapshot);
            }
        }
        return String.join("_", arrayList);
    }

    public Job getJob() {
        return this.job;
    }

    public void cleanup() {
        this.allTasks.forEach(task -> {
            this.taskManager.removeTask(task.getTaskId());
        });
    }

    public void run() {
        try {
            doChangeState(State.RUNNING);
            submitAllTasks();
            checkAllTasksStateAndWait();
            cleanup();
            LOGGER.info("job name is {}, state is {}", this.job.getName(), getCurrentState());
        } catch (Exception e) {
            doChangeState(State.FAILED);
            LOGGER.error("error caught: {}, message: {}", this.job.getJobConf().toJsonStr(), e);
        }
    }

    public void addCallbacks() {
        addCallback(State.ACCEPTED, State.RUNNING, (state, state2) -> {
        }).addCallback(State.RUNNING, State.FAILED, (state3, state4) -> {
            this.jobManager.markJobAsFailed(this.job.getJobInstanceId());
        }).addCallback(State.RUNNING, State.SUCCEEDED, (state5, state6) -> {
            this.jobManager.markJobAsSuccess(this.job.getJobInstanceId());
        });
    }

    public List<Task> getAllTasks() {
        return this.allTasks;
    }

    public synchronized void addTask(JobProfile jobProfile) {
        Task createTask = this.job.createTask(jobProfile);
        this.allTasks.add(createTask);
        LOGGER.info("job name is {} and add new task, total task {}", this.job.getName(), Integer.valueOf(this.allTasks.size()));
        this.taskManager.submitTask(createTask);
    }
}
