package org.apache.inlong.agent.core.job;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.core.task.Task;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.db.CommandDb;
import org.apache.inlong.agent.plugin.Channel;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Sink;
import org.apache.inlong.agent.plugin.Source;
import org.apache.inlong.agent.state.AbstractStateWrapper;
import org.apache.inlong.agent.state.State;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.db.CommandEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/core/job/JobWrapper.class */
public class JobWrapper extends AbstractStateWrapper {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobWrapper.class);
    private final TaskManager taskManager;
    private final JobManager jobManager;
    private final Job job;
    private CommandDb db;
    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
    private final List<Task> allTasks = new ArrayList();
    private final Queue<Task> taskQueue = new LinkedList();
    private final AtomicInteger index = new AtomicInteger(0);
    private boolean isEnd = false;

    public JobWrapper(AgentManager agentManager, Job job) {
        this.taskManager = agentManager.getTaskManager();
        this.jobManager = agentManager.getJobManager();
        this.job = job;
        this.db = agentManager.getCommandDb();
        doChangeState(State.ACCEPTED);
    }

    private void checkQueuedTasks() {
        while (this.taskQueue.peek() != null && !this.isEnd) {
            Task poll = this.taskQueue.poll();
            this.allTasks.add(poll);
            this.taskManager.submitTask(poll);
        }
    }

    private void checkAllTasksStateAndWait() throws Exception {
        boolean allMatch;
        long j = this.agentConf.getLong("job.finish.checkInterval", 6L);
        do {
            checkQueuedTasks();
            allMatch = this.allTasks.stream().allMatch(task -> {
                return this.taskManager.isTaskFinished(task.getTaskId());
            });
            TimeUnit.SECONDS.sleep(j);
        } while (!allMatch);
        LOGGER.info("all tasks of {} has been checked", this.job.getJobInstanceId());
        if (this.allTasks.stream().allMatch(task2 -> {
            return this.taskManager.isTaskSuccess(task2.getTaskId());
        })) {
            doChangeState(State.SUCCEEDED);
        } else {
            doChangeState(State.FAILED);
            saveFailedCommand();
        }
    }

    private void saveFailedCommand() {
        CommandEntity commandEntity = new CommandEntity();
        commandEntity.setId(this.job.getJobInstanceId());
        commandEntity.setAcked(false);
        commandEntity.setTaskId(Integer.valueOf(this.job.getJobInstanceId()));
        commandEntity.setCommandResult(1);
        commandEntity.setVersion(Integer.valueOf(this.job.getJobConf().getInt("job.version", AgentConstants.DEFAULT_JOB_VERSION.intValue())));
        this.db.storeCommand(commandEntity);
    }

    public String getSnapshot() {
        ArrayList arrayList = new ArrayList();
        Iterator<Task> it = this.allTasks.iterator();
        while (it.hasNext()) {
            String snapshot = it.next().getReader().getSnapshot();
            if (snapshot != null) {
                arrayList.add(snapshot);
            }
        }
        return String.join("_", arrayList);
    }

    public Job getJob() {
        return this.job;
    }

    public boolean exist(Predicate<Collection<Task>> predicate) {
        HashSet hashSet = new HashSet();
        hashSet.addAll(this.allTasks);
        hashSet.addAll(this.taskQueue);
        return predicate.test(hashSet);
    }

    public void submit(JobProfile jobProfile) {
        try {
            JobProfile jobConf = this.job.getJobConf();
            Preconditions.checkArgument(jobConf.get("job.source").equals(jobProfile.get("job.source")), "Subtask source should be same");
            Preconditions.checkArgument(jobConf.get("job.channel").equals(jobProfile.get("job.channel")), "Subtask channel should be same");
            Preconditions.checkArgument(jobConf.get("job.sink").equals(jobProfile.get("job.sink", "Subtask sink should be same")));
            LOGGER.info("job id: {} submit new task {}.", this.job.getJobInstanceId(), jobProfile.toJsonStr());
            for (Reader reader : ((Source) Class.forName(jobConf.get("job.source")).newInstance()).split(jobProfile)) {
                Sink sink = (Sink) Class.forName(jobConf.get("job.sink")).newInstance();
                sink.setSourceName(reader.getReadSource());
                Channel channel = (Channel) Class.forName(jobConf.get("job.channel")).newInstance();
                String format = String.format("%s_%d", this.job.getJobInstanceId(), Integer.valueOf(this.index.incrementAndGet()));
                jobConf.set(reader.getReadSource(), DigestUtils.md5Hex(reader.getReadSource()));
                this.taskQueue.offer(new Task(format, reader, sink, channel, jobProfile));
            }
        } catch (Throwable th) {
            LOGGER.error("Create task for profile({}) failed", jobProfile, th);
            ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
        }
        LOGGER.info("Job name is {} and task size {}", this.job.getName(), Integer.valueOf(this.allTasks.size()));
    }

    public void cleanup() {
        this.isEnd = true;
        this.allTasks.forEach(task -> {
            this.taskManager.removeTask(task.getTaskId());
        });
    }

    public void run() {
        try {
            AgentThreadFactory.nameThread(this.job.getJobInstanceId());
            LOGGER.info("job id: {}, source: {}, channel: {}, sink: {}", new Object[]{this.job.getJobInstanceId(), this.job.getJobConf().get("job.source"), this.job.getJobConf().get("job.channel"), this.job.getJobConf().get("job.sink")});
            doChangeState(State.RUNNING);
            submit(this.job.getJobConf());
            checkQueuedTasks();
            checkAllTasksStateAndWait();
            cleanup();
            LOGGER.info("job name is {}, state is {}", this.job.getName(), getCurrentState());
        } catch (Exception e) {
            doChangeState(State.FAILED);
            LOGGER.error("error caught: {}, message: {}", this.job.getJobConf().toJsonStr(), e);
        }
    }

    public void addCallbacks() {
        addCallback(State.ACCEPTED, State.RUNNING, (state, state2) -> {
        }).addCallback(State.RUNNING, State.FAILED, (state3, state4) -> {
            this.jobManager.markJobAsFailed(this.job.getJobInstanceId());
        }).addCallback(State.RUNNING, State.SUCCEEDED, (state5, state6) -> {
            this.jobManager.markJobAsSuccess(this.job.getJobInstanceId());
        });
    }

    public List<Task> getAllTasks() {
        return this.allTasks;
    }
}
