package org.apache.inlong.sort.standalone.sink.pulsar;

import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.class */
public class PulsarFederationWorker extends Thread {
    public static final Logger LOG = InlongLoggerFactory.getLogger(PulsarFederationWorker.class);
    private final String workerName;
    private final PulsarFederationSinkContext context;
    private PulsarProducerFederation producerFederation;
    private LifecycleState status = LifecycleState.IDLE;

    public PulsarFederationWorker(String str, int i, PulsarFederationSinkContext pulsarFederationSinkContext) {
        this.workerName = str + "-worker-" + i;
        this.context = pulsarFederationSinkContext;
        this.producerFederation = new PulsarProducerFederation(this.workerName, this.context);
    }

    @Override // java.lang.Thread
    public void start() {
        this.producerFederation.start();
        this.status = LifecycleState.START;
        super.start();
    }

    public void close() {
        this.producerFederation.close();
        this.status = LifecycleState.STOP;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        LOG.info(String.format("start PulsarSetWorker:%s", this.workerName));
        while (this.status != LifecycleState.STOP) {
            Channel channel = this.context.getChannel();
            Transaction transaction = channel.getTransaction();
            transaction.begin();
            try {
                ProfileEvent take = channel.take();
                if (take == null) {
                    transaction.commit();
                    sleepOneInterval();
                } else if (!(take instanceof ProfileEvent)) {
                    transaction.commit();
                    this.context.addSendFailMetric();
                    Thread.sleep(this.context.getProcessInterval());
                    return;
                } else {
                    ProfileEvent profileEvent = take;
                    this.context.addSendMetric(profileEvent, fillTopic(profileEvent));
                    this.producerFederation.send(profileEvent, transaction);
                }
            } catch (Throwable th) {
                LOG.error("Process event failed!" + getName(), th);
                try {
                    transaction.rollback();
                    transaction.close();
                    this.context.addSendFailMetric();
                    sleepOneInterval();
                } catch (Throwable th2) {
                    LOG.error("Channel take transaction rollback exception:" + getName(), th2);
                }
            }
        }
    }

    private String fillTopic(Event event) {
        Map headers = event.getHeaders();
        String topic = this.context.getTopic(InlongId.generateUid((String) headers.get("inlongGroupId"), (String) headers.get("inlongStreamId")));
        if (StringUtils.isBlank(topic)) {
            return "-";
        }
        headers.put("topic", topic);
        return topic;
    }

    private void sleepOneInterval() {
        try {
            Thread.sleep(this.context.getProcessInterval());
        } catch (InterruptedException e) {
            LOG.error(e.getMessage(), e);
        }
    }
}
