package org.apache.inlong.agent.core.task;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.state.AbstractStateWrapper;
import org.apache.inlong.agent.state.State;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/core/task/TaskWrapper.class */
public class TaskWrapper extends AbstractStateWrapper {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskWrapper.class);
    public static final int WAIT_FINISH_TIME_OUT = 1;
    public static final int WAIT_BEGIN_TIME_MINUTE = 1;
    private final TaskManager taskManager;
    private final Task task;
    private final AtomicInteger retryTime = new AtomicInteger(0);
    private final int maxRetryTime;
    private final int pushMaxWaitTime;
    private final int pullMaxWaitTime;
    private final int readWaitTime;
    private ExecutorService executorService;

    public TaskWrapper(AgentManager agentManager, Task task) {
        this.taskManager = agentManager.getTaskManager();
        this.task = task;
        AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
        this.maxRetryTime = agentConf.getInt("task.maxRetry.time", 3);
        this.pushMaxWaitTime = agentConf.getInt("task.push.maxSecond", 2);
        this.pullMaxWaitTime = agentConf.getInt("task.pull.maxSecond", 2);
        this.readWaitTime = agentConf.getInt("job.file.read.wait", 100);
        if (this.executorService == null) {
            this.executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new SynchronousQueue(), (ThreadFactory) new AgentThreadFactory("task-reader-writer"));
        }
        doChangeState(State.ACCEPTED);
    }

    private CompletableFuture<?> submitReadThread() {
        return CompletableFuture.runAsync(() -> {
            Message message = null;
            while (!isException() && !this.task.isReadFinished()) {
                if (!this.task.getReader().isSourceExist()) {
                    doChangeState(State.FAILED);
                } else if (message == null || this.task.getChannel().push(message, this.pushMaxWaitTime, TimeUnit.SECONDS)) {
                    message = this.task.getReader().read();
                }
                AgentUtils.silenceSleepInMs(this.readWaitTime);
            }
            LOGGER.info("read end, task exception status is {}, read finish status is {}", Boolean.valueOf(isException()), Boolean.valueOf(this.task.isReadFinished()));
            this.task.getChannel().push(new EndMessage());
            this.task.getReader().destroy();
        }, this.executorService);
    }

    private CompletableFuture<?> submitWriteThread() {
        return CompletableFuture.runAsync(() -> {
            while (!isException()) {
                Message pull = this.task.getChannel().pull(this.pullMaxWaitTime, TimeUnit.SECONDS);
                if (pull instanceof EndMessage) {
                    return;
                } else {
                    this.task.getSink().write(pull);
                }
            }
        }, this.executorService);
    }

    private void submitThreadsAndWait() {
        CompletableFuture.allOf(submitReadThread(), submitWriteThread()).exceptionally(th -> {
            doChangeState(State.FAILED);
            LOGGER.error("exception caught", th);
            return null;
        }).join();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void kill() {
        LOGGER.info("task id {} is killed", this.task.getTaskId());
        doChangeState(State.KILLED);
    }

    void waitForFinish() {
        LOGGER.info("set readTime out to 1 minute task id is {}", this.task.getTaskId());
        this.task.getReader().setReadTimeout(TimeUnit.MINUTES.toMillis(1L));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void destroyTask() {
        LOGGER.info("destroy task id is {}", this.task.getTaskId());
        this.task.getReader().finishRead();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean shouldRetry() {
        return this.retryTime.get() < this.maxRetryTime;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Task getTask() {
        return this.task;
    }

    public void addCallbacks() {
        addCallback(State.ACCEPTED, State.RUNNING, (state, state2) -> {
        }).addCallback(State.RUNNING, State.FAILED, (state3, state4) -> {
            LOGGER.info("task {} is failed, please check it", this.task.getTaskId());
            this.retryTime.incrementAndGet();
            if (shouldRetry()) {
                return;
            }
            doChangeState(State.FATAL);
            this.taskManager.getTaskMetrics().incFatalTaskCount();
        }).addCallback(State.FAILED, State.FATAL, (state5, state6) -> {
        }).addCallback(State.FAILED, State.ACCEPTED, (state7, state8) -> {
        }).addCallback(State.FAILED, State.RUNNING, (state9, state10) -> {
        }).addCallback(State.RUNNING, State.SUCCEEDED, (state11, state12) -> {
        });
    }

    public void run() {
        try {
            LOGGER.info("start to run {}, retry time is {}", this.task.getTaskId(), Integer.valueOf(this.retryTime.get()));
            AgentUtils.silenceSleepInMinute(1L);
            doChangeState(State.RUNNING);
            this.task.init();
            submitThreadsAndWait();
            if (!isException()) {
                doChangeState(State.SUCCEEDED);
            }
            LOGGER.info("start to destroy task {}", this.task.getTaskId());
            this.task.destroy();
        } catch (Exception e) {
            LOGGER.error("error while running wrapper", e);
            doChangeState(State.FAILED);
        }
    }
}
