package org.apache.inlong.sort.standalone.sink.elasticsearch;

import org.apache.flume.Channel;
import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.class */
public class EsChannelWorker extends Thread {
    public static final Logger LOG = LoggerFactory.getLogger(EsChannelWorker.class);
    private final EsSinkContext context;
    private final int workerIndex;
    private LifecycleState status = LifecycleState.IDLE;

    public EsChannelWorker(EsSinkContext esSinkContext, int i) {
        this.context = esSinkContext;
        this.workerIndex = i;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.status = LifecycleState.START;
        LOG.info("start to EsChannelWorker:{},status:{},index:{}", new Object[]{this.context.getTaskName(), this.status, Integer.valueOf(this.workerIndex)});
        while (this.status == LifecycleState.START) {
            try {
                doRun();
            } catch (Throwable th) {
                LOG.error(th.getMessage(), th);
            }
        }
    }

    public void doRun() {
        Channel channel = this.context.getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            try {
                ProfileEvent take = channel.take();
                if (take == null) {
                    transaction.commit();
                    Thread.sleep(this.context.getProcessInterval());
                    transaction.close();
                } else {
                    if (!(take instanceof ProfileEvent)) {
                        transaction.commit();
                        this.context.addSendFailMetric();
                        Thread.sleep(this.context.getProcessInterval());
                        transaction.close();
                        return;
                    }
                    EsIndexRequest parse = this.context.getIndexRequestHandler().parse(this.context, take);
                    if (parse != null) {
                        this.context.offerDispatchQueue(parse);
                    } else {
                        this.context.addSendFailMetric();
                    }
                    transaction.commit();
                    transaction.close();
                }
            } catch (Throwable th) {
                LOG.error("Process event failed!" + getName(), th);
                try {
                    transaction.rollback();
                } catch (Throwable th2) {
                    LOG.error("Channel take transaction rollback exception:" + getName(), th2);
                }
                transaction.close();
            }
        } catch (Throwable th3) {
            transaction.close();
            throw th3;
        }
    }

    public void close() {
        this.status = LifecycleState.STOP;
    }
}
