package org.apache.inlong.sort.standalone.sink.cls;

import com.google.common.base.Preconditions;
import com.tencentcloudapi.cls.producer.errors.ProducerException;
import org.apache.flume.Channel;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.class */
public class ClsChannelWorker extends Thread {
    private static final Logger LOG = InlongLoggerFactory.getLogger(ClsChannelWorker.class);
    private final ClsSinkContext context;
    private final String workerName;
    private final Channel channel;
    private final IEvent2LogItemHandler handler;
    private LifecycleState status = LifecycleState.IDLE;

    public ClsChannelWorker(String str, ClsSinkContext clsSinkContext, int i) {
        this.context = (ClsSinkContext) Preconditions.checkNotNull(clsSinkContext);
        this.workerName = str + "-" + i;
        this.channel = (Channel) Preconditions.checkNotNull(clsSinkContext.getChannel());
        this.handler = (IEvent2LogItemHandler) Preconditions.checkNotNull(clsSinkContext.getLogItemHandler());
    }

    @Override // java.lang.Thread
    public void start() {
        LOG.info("Start new cls channel worker {}", this.workerName);
        this.status = LifecycleState.START;
        super.start();
    }

    public void close() {
        LOG.info("Close cls channel worker {}", this.workerName);
        this.status = LifecycleState.STOP;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        LOG.info("worker {} start to run, the state is {}", this.workerName, this.status.name());
        while (this.status != LifecycleState.STOP) {
            doRun();
        }
    }

    private void doRun() {
        try {
            Transaction transaction = this.channel.getTransaction();
            transaction.begin();
            Event take = this.channel.take();
            if (take == null) {
                commitTransaction(transaction);
                sleepOneInterval();
            } else if (take instanceof ProfileEvent) {
                send(take, transaction);
            } else {
                commitTransaction(transaction);
                LOG.error("The type of row event is not compatible with ProfileEvent");
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            rollbackTransaction(null);
            sleepOneInterval();
        }
    }

    private void send(Event event, Transaction transaction) throws ProducerException, InterruptedException {
        ProfileEvent profileEvent = (ProfileEvent) event;
        ClsIdConfig idConfig = this.context.getIdConfig(profileEvent.getUid());
        if (idConfig == null) {
            profileEvent.ack();
            LOG.error("There is no cls id config for uid {}, discard it", profileEvent.getUid());
            this.context.addSendResultMetric(profileEvent, this.context.getTaskName(), false, System.currentTimeMillis());
        } else {
            profileEvent.getHeaders().put(ClsSinkContext.KEY_TOPIC_ID, idConfig.getTopicId());
            this.context.getClient(idConfig.getSecretId()).putLogs(idConfig.getTopicId(), this.handler.parse(this.context, profileEvent), new ClsCallback(transaction, this.context, profileEvent));
        }
    }

    private void sleepOneInterval() {
        try {
            Thread.sleep(this.context.getProcessInterval());
        } catch (InterruptedException e) {
            LOG.error(e.getMessage(), e);
        }
    }

    private void rollbackTransaction(Transaction transaction) {
        if (transaction != null) {
            transaction.rollback();
            transaction.close();
        }
    }

    private void commitTransaction(Transaction transaction) {
        if (transaction != null) {
            transaction.commit();
            transaction.close();
        }
    }
}
