package org.apache.inlong.dataproxy.sink.pulsar.federation;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.class */
public class PulsarFederationSink extends AbstractSink implements Configurable {
    public static final Logger LOG = LoggerFactory.getLogger(PulsarFederationSink.class);
    private PulsarFederationSinkContext context;
    private List<PulsarFederationWorker> workers = new ArrayList();
    private Map<String, String> dimensions;

    public void start() {
        String name = getName();
        for (int i = 0; i < this.context.getMaxThreads(); i++) {
            PulsarFederationWorker pulsarFederationWorker = new PulsarFederationWorker(name, i, this.context);
            pulsarFederationWorker.start();
            this.workers.add(pulsarFederationWorker);
        }
        super.start();
    }

    public void stop() {
        Iterator<PulsarFederationWorker> it = this.workers.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Throwable th) {
                LOG.error(th.getMessage(), th);
            }
        }
        this.context.close();
        super.stop();
    }

    public void configure(Context context) {
        LOG.info("start to configure:{}, context:{}.", getClass().getSimpleName(), context.toString());
        this.context = new PulsarFederationSinkContext(getName(), context);
        this.dimensions = new HashMap();
        this.dimensions.put("clusterId", this.context.getProxyClusterId());
        this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, getName());
    }

    public Sink.Status process() throws EventDeliveryException {
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            try {
                Event take = channel.take();
                if (take == null) {
                    transaction.commit();
                    Sink.Status status = Sink.Status.BACKOFF;
                    transaction.close();
                    return status;
                }
                int length = take.getBody().length;
                if (this.context.getBufferQueue().tryAcquire(length)) {
                    this.context.getBufferQueue().offer(take);
                    transaction.commit();
                    Sink.Status status2 = Sink.Status.READY;
                    transaction.close();
                    return status2;
                }
                DataProxyMetricItem dataProxyMetricItem = (DataProxyMetricItem) this.context.getMetricItemSet().findMetricItem(this.dimensions);
                dataProxyMetricItem.readFailCount.incrementAndGet();
                dataProxyMetricItem.readFailSize.addAndGet(length);
                transaction.rollback();
                Sink.Status status3 = Sink.Status.BACKOFF;
                transaction.close();
                return status3;
            } catch (Throwable th) {
                LOG.error("Process event failed!" + getName(), th);
                try {
                    transaction.rollback();
                    ((DataProxyMetricItem) this.context.getMetricItemSet().findMetricItem(this.dimensions)).readFailCount.incrementAndGet();
                } catch (Throwable th2) {
                    LOG.error("Channel take transaction rollback exception:" + getName(), th2);
                }
                Sink.Status status4 = Sink.Status.BACKOFF;
                transaction.close();
                return status4;
            }
        } catch (Throwable th3) {
            transaction.close();
            throw th3;
        }
    }
}
