package org.apache.inlong.agent.plugin.instance;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.core.instance.ActionType;
import org.apache.inlong.agent.core.instance.InstanceAction;
import org.apache.inlong.agent.core.instance.InstanceManager;
import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Sink;
import org.apache.inlong.agent.plugin.file.Source;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.state.State;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/instance/FileInstance.class */
public class FileInstance extends Instance {
    private static final Logger LOGGER = LoggerFactory.getLogger(FileInstance.class);
    private Source source;
    private Sink sink;
    private InstanceProfile profile;
    public static final int CORE_THREAD_SLEEP_TIME = 1;
    private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
    private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
    private InstanceManager instanceManager;
    private volatile boolean running = false;
    private volatile boolean inited = false;
    private volatile int checkFinishCount = 0;

    public void init(Object obj, InstanceProfile instanceProfile) {
        try {
            this.instanceManager = (InstanceManager) obj;
            this.profile = instanceProfile;
            this.profile.set("inodeInfo", FileDataUtils.getInodeInfo(this.profile.getInstanceId()));
            LOGGER.info("task id: {} submit new instance {} profile detail {}.", new Object[]{this.profile.getTaskId(), this.profile.getInstanceId(), this.profile.toJsonStr()});
            this.source = (Source) Class.forName(this.profile.getSourceClass()).newInstance();
            this.source.init(this.profile);
            this.sink = (Sink) Class.forName(this.profile.getSinkClass()).newInstance();
            this.sink.init(this.profile);
            this.inited = true;
        } catch (Throwable th) {
            doChangeState(State.FATAL);
            LOGGER.error("init instance {} for task {} failed", new Object[]{this.profile.getInstanceId(), this.profile.getInstanceId(), th});
            ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
        }
    }

    public void destroy() {
        if (this.inited) {
            doChangeState(State.SUCCEEDED);
            while (this.running) {
                AgentUtils.silenceSleepInMs(10L);
            }
            this.source.destroy();
            this.sink.destroy();
        }
    }

    public void run() {
        Thread.currentThread().setName("file-instance-core-" + getTaskId() + "-" + getInstanceId());
        this.running = true;
        while (true) {
            if (isFinished()) {
                break;
            }
            if (!this.source.sourceExist()) {
                handleSourceDeleted();
                break;
            }
            Message read = this.source.read();
            if (read == null) {
                if (this.source.sourceFinish() && this.sink.sinkFinish()) {
                    this.checkFinishCount++;
                    if (this.checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) {
                        handleReadEnd();
                        break;
                    }
                } else {
                    this.checkFinishCount = 0;
                }
                AgentUtils.silenceSleepInSeconds(1L);
                AuditUtils.add(30010, this.profile.getInlongGroupId(), this.profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1L);
            } else {
                this.sink.write(read);
            }
        }
        this.running = false;
    }

    private void handleReadEnd() {
        InstanceAction instanceAction = new InstanceAction(ActionType.FINISH, this.profile);
        while (!isFinished() && !this.instanceManager.submitAction(instanceAction)) {
            LOGGER.error("instance manager action queue is full: taskId {}", this.instanceManager.getTaskId());
            AgentUtils.silenceSleepInSeconds(1L);
        }
    }

    private void handleSourceDeleted() {
        OffsetManager.getInstance().deleteOffset(getTaskId(), getInstanceId());
        this.profile.setState(InstanceStateEnum.DELETE);
        this.profile.setModifyTime(Long.valueOf(AgentUtils.getCurrentTime()));
        InstanceAction instanceAction = new InstanceAction(ActionType.DELETE, this.profile);
        while (!isFinished() && !this.instanceManager.submitAction(instanceAction)) {
            LOGGER.error("instance manager action queue is full: taskId {}", this.instanceManager.getTaskId());
            AgentUtils.silenceSleepInSeconds(1L);
        }
    }

    public void addCallbacks() {
    }

    public String getTaskId() {
        return this.profile.getTaskId();
    }

    public String getInstanceId() {
        return this.profile.getInstanceId();
    }

    public Sink getSink() {
        return this.sink;
    }

    public InstanceProfile getProfile() {
        return this.profile;
    }
}
