package io.dingodb.exec.impl;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.dingodb.common.CommonId;
import io.dingodb.common.Location;
import io.dingodb.common.concurrent.Executors;
import io.dingodb.common.log.LogUtils;
import io.dingodb.common.log.MdcUtils;
import io.dingodb.common.metrics.DingoMetrics;
import io.dingodb.common.type.DingoType;
import io.dingodb.exec.OperatorFactory;
import io.dingodb.exec.base.Operator;
import io.dingodb.exec.base.Status;
import io.dingodb.exec.base.Task;
import io.dingodb.exec.dag.Vertex;
import io.dingodb.exec.exception.TaskCancelException;
import io.dingodb.exec.fin.ErrorType;
import io.dingodb.exec.fin.FinWithException;
import io.dingodb.exec.fin.TaskStatus;
import io.dingodb.exec.operator.SourceOperator;
import io.dingodb.exec.transaction.base.TransactionType;
import io.dingodb.exec.utils.OperatorCodeUtils;
import io.dingodb.sdk.service.entity.store.Context;
import io.dingodb.store.api.transaction.data.IsolationLevel;
import io.dingodb.store.api.transaction.exception.DuplicateEntryException;
import io.dingodb.store.api.transaction.exception.LockWaitException;
import io.dingodb.store.api.transaction.exception.WriteConflictException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"txnType", Context.Fields.isolationLevel, "txnId", "jobId", "location", "operators", "runList", "parasType", "bachTask"})
/* loaded from: input_file:io/dingodb/exec/impl/TaskImpl.class */
public final class TaskImpl implements Task {
    private static final Logger log;

    @JsonProperty("id")
    @JsonSerialize(using = CommonId.JacksonSerializer.class)
    @JsonDeserialize(using = CommonId.JacksonDeserializer.class)
    private final CommonId id;

    @JsonProperty("jobId")
    @JsonSerialize(using = CommonId.JacksonSerializer.class)
    @JsonDeserialize(using = CommonId.JacksonDeserializer.class)
    private final CommonId jobId;

    @JsonProperty("txnId")
    @JsonSerialize(using = CommonId.JacksonSerializer.class)
    @JsonDeserialize(using = CommonId.JacksonDeserializer.class)
    private CommonId txnId;

    @JsonProperty("location")
    private final Location location;

    @JsonProperty("parasType")
    private final DingoType parasType;

    @JsonProperty("txnType")
    private final TransactionType transactionType;

    @JsonProperty(Context.Fields.isolationLevel)
    private final IsolationLevel isolationLevel;

    @JsonProperty("bachTask")
    private boolean bachTask;

    @JsonProperty("maxExecutionTime")
    private long maxExecutionTime;

    @JsonProperty("isSelect")
    private Boolean isSelect;
    private transient TaskStatus taskInitStatus;
    static final /* synthetic */ boolean $assertionsDisabled;
    private CommonId rootOperatorId = null;
    private transient AtomicInteger status = new AtomicInteger(Status.BORN);
    private transient CountDownLatch activeThreads = null;

    @JsonProperty("runList")
    @JsonSerialize(contentUsing = CommonId.JacksonSerializer.class)
    @JsonDeserialize(contentUsing = CommonId.JacksonDeserializer.class)
    private final List<CommonId> runList = new LinkedList();

    @JsonProperty("vertexes")
    @JsonSerialize(keyUsing = CommonId.JacksonKeySerializer.class, contentAs = Vertex.class)
    @JsonDeserialize(keyUsing = CommonId.JacksonKeyDeserializer.class, contentAs = Vertex.class)
    private final Map<CommonId, Vertex> vertexes = new HashMap();
    private transient io.dingodb.exec.operator.data.Context context = io.dingodb.exec.operator.data.Context.builder().pin(0).keyState(new ArrayList()).build();

    @JsonCreator
    public TaskImpl(@JsonProperty("id") CommonId commonId, @JsonProperty("jobId") CommonId commonId2, @JsonProperty("txnId") CommonId commonId3, @JsonProperty("location") Location location, @JsonProperty("parasType") DingoType dingoType, @JsonProperty("txnType") TransactionType transactionType, @JsonProperty("isolationLevel") IsolationLevel isolationLevel, @JsonProperty("maxExecutionTime") long j, @JsonProperty("isSelect") Boolean bool) {
        this.id = commonId;
        this.jobId = commonId2;
        this.txnId = commonId3;
        this.location = location;
        this.parasType = dingoType;
        this.transactionType = transactionType;
        this.isolationLevel = isolationLevel;
        this.maxExecutionTime = j;
        this.isSelect = bool;
    }

    @Override // io.dingodb.exec.base.Task
    public io.dingodb.exec.operator.data.Context getContext() {
        if (this.context == null) {
            this.context = io.dingodb.exec.operator.data.Context.builder().pin(0).keyState(new ArrayList()).build();
        }
        return this.context;
    }

    @Override // io.dingodb.exec.base.Task
    public Vertex getRoot() {
        return this.vertexes.get(this.rootOperatorId);
    }

    @Override // io.dingodb.exec.base.Task
    public void markRoot(CommonId commonId) {
        if (!$assertionsDisabled && !this.vertexes.get(commonId).getOp().equals(OperatorCodeUtils.ROOT)) {
            throw new AssertionError("The root operator must be a `RootOperator`.");
        }
        this.rootOperatorId = commonId;
    }

    @Override // io.dingodb.exec.base.Task
    public int getStatus() {
        return this.status.get();
    }

    @Override // io.dingodb.exec.base.Task
    public void putVertex(Vertex vertex) {
        vertex.setTask(this);
        this.vertexes.put(vertex.getId(), vertex);
        if (vertex.getOp().domain == OperatorCodeUtils.SOURCE.longValue()) {
            this.runList.add(vertex.getId());
        }
    }

    @Override // io.dingodb.exec.base.Task
    public void init() {
        this.status = new AtomicInteger(Status.BORN);
        boolean z = true;
        String str = "";
        getVertexes().forEach((commonId, vertex) -> {
            vertex.setId(commonId);
            vertex.setTask(this);
        });
        for (Vertex vertex2 : getVertexes().values()) {
            try {
                vertex2.init();
            } catch (Exception e) {
                LogUtils.error(log, "Init operator:{} in jobId:{} task:{} failed catch exception:{}", vertex2.getOp(), this.jobId.toString(), this.id.toString(), e, e);
                str = e.toString();
                z = false;
            }
        }
        this.taskInitStatus = new TaskStatus();
        this.taskInitStatus.setStatus(Boolean.valueOf(z));
        this.taskInitStatus.setTaskId(this.id.toString());
        this.taskInitStatus.setErrorMsg(str);
        if (this.taskInitStatus.getStatus().booleanValue()) {
            this.status.compareAndSet(Status.BORN, Status.READY);
        }
    }

    @Override // io.dingodb.exec.base.Task
    public void run(Object[] objArr) {
        if (this.status.get() != Status.BORN) {
            Executors.execute("task-" + this.jobId + "-" + this.id, () -> {
                internalRun(objArr);
            });
            return;
        }
        LogUtils.error(log, "jobId:{}, taskId:{}, Run task but check task has init failed: {}", this.jobId.toString(), this.id.toString(), this.taskInitStatus);
        Vertex vertex = this.vertexes.get(this.runList.get(0));
        OperatorFactory.getInstance(vertex.getOp()).fin(0, FinWithException.of(this.taskInitStatus), vertex);
    }

    private synchronized void internalRun(Object[] objArr) {
        MdcUtils.setTxnId(this.txnId.toString());
        if (!this.status.compareAndSet(Status.READY, Status.RUNNING)) {
            throw new RuntimeException("Status should be READY.");
        }
        LogUtils.debug(log, "Task is starting at {}...", this.location);
        this.activeThreads = new CountDownLatch(this.runList.size());
        setParas(objArr);
        if (this.bachTask) {
            setStartTs(this.txnId.seq);
        }
        for (CommonId commonId : this.runList) {
            Vertex vertex = this.vertexes.get(commonId);
            Operator operatorFactory = OperatorFactory.getInstance(vertex.getOp());
            if (!$assertionsDisabled && !(operatorFactory instanceof SourceOperator)) {
                throw new AssertionError("Operators in run list must be source operator.");
            }
            Executors.execute("operator-" + this.jobId + "-" + this.id + "-" + commonId, () -> {
                MdcUtils.setTxnId(this.txnId.toString());
                long currentTimeMillis = System.currentTimeMillis();
                DingoMetrics.activeTaskCount.incrementAndGet();
                while (operatorFactory.push(getContext().copy(), null, vertex)) {
                    try {
                        try {
                            LogUtils.info(log, "Operator {} need another pushing.", vertex.getId());
                        } catch (RuntimeException e) {
                            LogUtils.error(log, "Run Task:" + getId().toString() + ",catch operator:" + vertex.getId() + " run Exception: ", e);
                            this.status.compareAndSet(Status.RUNNING, Status.STOPPED);
                            TaskStatus taskStatus = new TaskStatus();
                            taskStatus.setStatus(false);
                            taskStatus.setTaskId(vertex.getTask().getId().toString());
                            taskStatus.setErrorMsg(e.toString());
                            if (e instanceof WriteConflictException) {
                                taskStatus.setErrorType(ErrorType.WriteConflict);
                            } else if (e instanceof DuplicateEntryException) {
                                taskStatus.setErrorType(ErrorType.DuplicateEntry);
                            } else if (e instanceof LockWaitException) {
                                taskStatus.setErrorType(ErrorType.LockWait);
                            } else if (e instanceof TaskCancelException) {
                                taskStatus.setErrorType(ErrorType.TaskCancel);
                            } else {
                                taskStatus.setErrorType(ErrorType.TaskFin);
                            }
                            try {
                                operatorFactory.fin(0, FinWithException.of(taskStatus), vertex);
                                DingoMetrics.activeTaskCount.decrementAndGet();
                                LogUtils.debug(log, "TaskImpl run cost: {}ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                                MdcUtils.removeTxnId();
                                this.activeThreads.countDown();
                                return;
                            } catch (RuntimeException e2) {
                                LogUtils.error(log, "Run Task Fin:{} catch operator:{} run Exception:{}", getId().toString(), vertex.getId(), e2, e2);
                                throw e2;
                            }
                        }
                    } catch (Throwable th) {
                        DingoMetrics.activeTaskCount.decrementAndGet();
                        LogUtils.debug(log, "TaskImpl run cost: {}ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                        MdcUtils.removeTxnId();
                        this.activeThreads.countDown();
                        throw th;
                    }
                }
                operatorFactory.fin(0, null, vertex);
                DingoMetrics.activeTaskCount.decrementAndGet();
                LogUtils.debug(log, "TaskImpl run cost: {}ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                MdcUtils.removeTxnId();
                this.activeThreads.countDown();
            });
        }
        while (true) {
            try {
                this.activeThreads.await();
                this.status.compareAndSet(Status.RUNNING, Status.READY);
                this.status.compareAndSet(Status.STOPPED, Status.READY);
                return;
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // io.dingodb.exec.base.Task
    public boolean cancel() {
        this.status.set(Status.CANCEL);
        return true;
    }

    @Override // io.dingodb.exec.base.Task
    public boolean getBachTask() {
        return this.bachTask;
    }

    @Override // io.dingodb.exec.base.Task
    public void setBathTask(boolean z) {
        this.bachTask = z;
    }

    public String toString() {
        try {
            return JobImpl.PARSER.stringify(this);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // io.dingodb.exec.base.Task
    public CommonId getId() {
        return this.id;
    }

    @Override // io.dingodb.exec.base.Task
    public CommonId getJobId() {
        return this.jobId;
    }

    @Override // io.dingodb.exec.base.Task
    public CommonId getTxnId() {
        return this.txnId;
    }

    @Override // io.dingodb.exec.base.Task
    @JsonProperty("txnId")
    @JsonDeserialize(using = CommonId.JacksonDeserializer.class)
    public void setTxnId(CommonId commonId) {
        this.txnId = commonId;
    }

    @Override // io.dingodb.exec.base.Task
    public Location getLocation() {
        return this.location;
    }

    @Override // io.dingodb.exec.base.Task
    public Map<CommonId, Vertex> getVertexes() {
        return this.vertexes;
    }

    @Override // io.dingodb.exec.base.Task
    public List<CommonId> getRunList() {
        return this.runList;
    }

    @Override // io.dingodb.exec.base.Task
    public DingoType getParasType() {
        return this.parasType;
    }

    @Override // io.dingodb.exec.base.Task
    public TransactionType getTransactionType() {
        return this.transactionType;
    }

    @Override // io.dingodb.exec.base.Task
    public IsolationLevel getIsolationLevel() {
        return this.isolationLevel;
    }

    public TaskStatus getTaskInitStatus() {
        return this.taskInitStatus;
    }

    @Override // io.dingodb.exec.base.Task
    public void setContext(io.dingodb.exec.operator.data.Context context) {
        this.context = context;
    }

    static {
        $assertionsDisabled = !TaskImpl.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger((Class<?>) TaskImpl.class);
    }
}
