package io.dingodb.exec.impl;

import com.codahale.metrics.Timer;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.dingodb.common.CommonId;
import io.dingodb.common.Location;
import io.dingodb.common.log.LogUtils;
import io.dingodb.common.metrics.DingoMetrics;
import io.dingodb.common.type.DingoType;
import io.dingodb.exec.Services;
import io.dingodb.exec.base.IdGenerator;
import io.dingodb.exec.base.Job;
import io.dingodb.exec.base.JobManager;
import io.dingodb.exec.base.Status;
import io.dingodb.exec.base.Task;
import io.dingodb.exec.base.TaskManager;
import io.dingodb.exec.impl.message.CreateTaskMessage;
import io.dingodb.exec.impl.message.DestroyTaskMessage;
import io.dingodb.exec.impl.message.RunTaskMessage;
import io.dingodb.exec.impl.message.TaskMessage;
import io.dingodb.exec.operator.params.RootParam;
import io.dingodb.exec.transaction.base.ITransaction;
import io.dingodb.exec.transaction.impl.TransactionManager;
import io.dingodb.meta.MetaService;
import io.dingodb.net.Channel;
import io.dingodb.net.Message;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/dingodb/exec/impl/JobManagerImpl.class */
public final class JobManagerImpl implements JobManager {
    private static final Logger log;
    public static final String TASK_TAG = "DINGO_TASK";
    public static final JobManagerImpl INSTANCE;
    private final Map<Location, Channel> channelMap;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Map<CommonId, Job> jobMap = new ConcurrentHashMap();
    private final TaskManager taskManager = TaskManagerImpl.INSTANCE;
    private final IdGenerator idGenerator = new IdGeneratorImpl();

    private JobManagerImpl(int i) {
        this.channelMap = new ConcurrentHashMap(i);
    }

    @Override // io.dingodb.exec.base.JobManager
    public Job createJob(long j, long j2, CommonId commonId, DingoType dingoType, long j3, Boolean bool) {
        JobImpl jobImpl = new JobImpl(this.idGenerator.getJobId(j, j2), commonId, dingoType, j3, bool);
        CommonId jobId = jobImpl.getJobId();
        this.jobMap.put(jobId, jobImpl);
        LogUtils.debug(log, "Created job \"{}\". # of jobs: {}.", jobId, Integer.valueOf(this.jobMap.size()));
        return jobImpl;
    }

    @Override // io.dingodb.exec.base.JobManager
    public Job getJob(CommonId commonId) {
        if (commonId == null) {
            return null;
        }
        return this.jobMap.get(commonId);
    }

    @Override // io.dingodb.exec.base.JobManager
    public void removeJob(CommonId commonId) {
        Job remove = this.jobMap.remove(commonId);
        LogUtils.debug(log, "Removed job \"{}\". # of jobs: {}.", commonId, Integer.valueOf(this.jobMap.size()));
        if (remove != null) {
            for (Task task : remove.getTasks().values()) {
                if (task.getRoot() != null) {
                    this.taskManager.removeTask(task);
                } else {
                    sendTaskMessage(task, new Message(TASK_TAG, new DestroyTaskMessage(task).toBytes()));
                }
            }
        }
    }

    @Override // io.dingodb.exec.base.JobManager
    public Iterator<Object[]> createIterator(Job job, Object[] objArr) {
        if (job.isEmpty()) {
            return Collections.emptyIterator();
        }
        if (job.getStatus() == Status.BORN) {
            distributeTasks(job);
        }
        run(job, objArr);
        return new JobIteratorImpl(job, job.getRoot().getRoot());
    }

    @Override // io.dingodb.exec.base.JobManager
    public Iterator<Object[]> createIterator(Job job, Object[] objArr, long j) {
        ((RootParam) job.getRoot().getRoot().getParam()).setTakeTtl(j);
        return createIterator(job, objArr);
    }

    @Override // io.dingodb.exec.base.JobManager
    public void close() {
        this.channelMap.values().forEach((v0) -> {
            v0.close();
        });
        this.jobMap.keySet().forEach(this::removeJob);
        this.taskManager.close();
    }

    private void distributeTasks(Job job) {
        for (Task task : job.getTasks().values()) {
            if (task.getRoot() == null) {
                try {
                    sendTaskMessage(task, new Message(TASK_TAG, new CreateTaskMessage(task).toBytes()));
                } catch (Exception e) {
                    LogUtils.error(log, "jobId:{}, Error to distribute tasks.", job.getJobId(), e);
                    throw new RuntimeException("jobId:" + job.getJobId() + "taskId:" + task.getId() + ", Error to distribute tasks.", e);
                }
            } else {
                if (!$assertionsDisabled && !task.getLocation().equals(MetaService.root().currentLocation())) {
                    throw new AssertionError("The root task must be at current location.");
                }
                this.taskManager.addTask(task);
            }
        }
    }

    private void run(Job job, Object[] objArr) {
        for (Task task : job.getTasks().values()) {
            if (task.getRoot() != null) {
                task.run(objArr);
            } else {
                sendTaskMessage(task, new Message(TASK_TAG, new RunTaskMessage(task, job.getParasType(), objArr).toBytes()));
            }
        }
    }

    private void sendTaskMessage(Task task, Message message) {
        Location location = task.getLocation();
        Channel computeIfAbsent = this.channelMap.computeIfAbsent(location, location2 -> {
            return Services.openNewSysChannel(location2.getHost(), location2.getPort());
        });
        computeIfAbsent.setCloseListener(channel -> {
            this.channelMap.remove(location);
        });
        computeIfAbsent.send(message);
        ITransaction transaction = TransactionManager.getTransaction(task.getTxnId());
        if (transaction != null) {
            transaction.registerChannel(task.getId(), computeIfAbsent);
        }
    }

    public void processMessage(Message message) {
        Timer.Context timeContext = DingoMetrics.getTimeContext("deserialize");
        try {
            TaskMessage fromBytes = TaskMessage.fromBytes(message.content());
            timeContext.stop();
            if (fromBytes instanceof CreateTaskMessage) {
                processCommand((CreateTaskMessage) fromBytes);
            } else if (fromBytes instanceof RunTaskMessage) {
                processCommand((RunTaskMessage) fromBytes);
            } else if (fromBytes instanceof DestroyTaskMessage) {
                processCommand((DestroyTaskMessage) fromBytes);
            }
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Cannot deserialize received TaskMessage.", e);
        }
    }

    private void processCommand(CreateTaskMessage createTaskMessage) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            Task task = createTaskMessage.getTask();
            if (TransactionManager.getTransaction(task.getTxnId() == null ? CommonId.EMPTY_TRANSACTION : task.getTxnId()) == null) {
                TransactionManager.createTransaction(task.getTransactionType(), task.getTxnId() == null ? CommonId.EMPTY_TRANSACTION : task.getTxnId(), task.getIsolationLevel().getCode());
            }
            this.taskManager.addTask(task);
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            LogUtils.debug(log, "jobTime cost: {}ms.", Long.valueOf(currentTimeMillis2));
            DingoMetrics.latency("on_task_message", currentTimeMillis2);
        } catch (Throwable th) {
            long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
            LogUtils.debug(log, "jobTime cost: {}ms.", Long.valueOf(currentTimeMillis3));
            DingoMetrics.latency("on_task_message", currentTimeMillis3);
            throw th;
        }
    }

    private void processCommand(RunTaskMessage runTaskMessage) {
        this.taskManager.getTask(runTaskMessage.getJobId(), runTaskMessage.getTaskId()).run(runTaskMessage.getParas());
    }

    private void processCommand(DestroyTaskMessage destroyTaskMessage) {
        this.taskManager.removeTask(destroyTaskMessage.getJobId(), destroyTaskMessage.getTaskId());
    }

    public TaskManager getTaskManager() {
        return this.taskManager;
    }

    static {
        $assertionsDisabled = !JobManagerImpl.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger((Class<?>) JobManagerImpl.class);
        INSTANCE = new JobManagerImpl(10);
    }
}
