package org.apache.inlong.sort.standalone.sink.kafka;

import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.class */
public class KafkaFederationWorker extends Thread {
    public static final Logger LOG = InlongLoggerFactory.getLogger(KafkaFederationWorker.class);
    private final String workerName;
    private final KafkaFederationSinkContext context;
    private final KafkaProducerFederation producerFederation;
    private final Map<String, String> dimensions = new ConcurrentHashMap();
    private LifecycleState status = LifecycleState.IDLE;

    public KafkaFederationWorker(String str, int i, KafkaFederationSinkContext kafkaFederationSinkContext) {
        this.workerName = str + "-" + i;
        this.context = (KafkaFederationSinkContext) Preconditions.checkNotNull(kafkaFederationSinkContext);
        this.producerFederation = new KafkaProducerFederation(String.valueOf(i), this.context);
        this.dimensions.put("clusterId", this.context.getClusterId());
        this.dimensions.put(SinkContext.KEY_TASK_NAME, this.context.getTaskName());
        this.dimensions.put("sinkId", this.context.getSinkName());
    }

    @Override // java.lang.Thread
    public void start() {
        LOG.info("start a new kafka worker {}", this.workerName);
        this.producerFederation.start();
        this.status = LifecycleState.START;
        super.start();
    }

    public void close() {
        LOG.info("close a kafka worker {}", this.workerName);
        this.producerFederation.close();
        this.status = LifecycleState.STOP;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        LOG.info("worker {} start to run, the state is {}", this.workerName, this.status.name());
        while (this.status != LifecycleState.STOP) {
            Transaction transaction = null;
            try {
                Channel channel = this.context.getChannel();
                transaction = channel.getTransaction();
                transaction.begin();
                ProfileEvent take = channel.take();
                if (take == null) {
                    transaction.commit();
                    transaction.close();
                    sleepOneInterval();
                } else if (take instanceof ProfileEvent) {
                    ProfileEvent profileEvent = take;
                    String topic = this.context.getTopic(profileEvent.getUid());
                    if (StringUtils.isBlank(topic)) {
                        this.context.addSendResultMetric(profileEvent, profileEvent.getUid(), false, System.currentTimeMillis());
                        profileEvent.ack();
                        transaction.commit();
                        transaction.close();
                    }
                    profileEvent.getHeaders().put("topic", topic);
                    this.context.addSendMetric(profileEvent, topic);
                    this.producerFederation.send(profileEvent, transaction);
                } else {
                    transaction.commit();
                    transaction.close();
                    LOG.error("The type of row event is not compatible with ProfileEvent");
                }
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
                if (transaction != null) {
                    transaction.rollback();
                    transaction.close();
                }
                this.context.getMetricItemSet().findMetricItem(this.dimensions).sendFailCount.incrementAndGet();
                sleepOneInterval();
            }
        }
    }

    private void sleepOneInterval() {
        try {
            Thread.sleep(this.context.getProcessInterval());
        } catch (InterruptedException e) {
            LOG.error(e.getMessage(), e);
        }
    }
}
