package org.apache.inlong.dataproxy.sink.mq;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.sink.AbstractSink;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.utils.BufferQueue;
import org.apache.inlong.dataproxy.utils.Constants;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.class */
public class MessageQueueZoneSink extends AbstractSink implements Configurable, ConfigUpdateCallback {
    private static final Logger logger = LoggerFactory.getLogger(MessageQueueZoneSink.class);
    private static final LogCounter logCounter = new LogCounter(10, 100000, 30000);
    private String cachedSinkName;
    private Channel cachedMsgChannel;
    private Context parentContext;
    private MessageQueueZoneSinkContext context;
    private BatchPackManager dispatchManager;
    private ScheduledExecutorService scheduledPool;
    private MessageQueueZoneProducer zoneProducer;
    private Thread configListener;
    private final long MQ_CLUSTER_STATUS_CHECK_DUR_MS = 2000;
    private final List<MessageQueueZoneWorker> workers = new ArrayList();
    private final BufferQueue<PackProfile> dispatchQueue = new BufferQueue<>(CommonConfigHolder.getInstance().getMaxBufferQueueSizeKb());
    private final ReentrantLock reentrantLock = new ReentrantLock();
    private final Condition condition = this.reentrantLock.newCondition();
    private final AtomicLong lastNotifyTime = new AtomicLong(0);
    private volatile boolean isShutdown = false;
    private volatile boolean mqClusterStarted = false;

    /* loaded from: input_file:org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink$ConfigChangeProcessor.class */
    private class ConfigChangeProcessor implements Runnable {
        private ConfigChangeProcessor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            long j;
            MessageQueueZoneSink.logger.info("{} config-change processor start!", MessageQueueZoneSink.this.cachedSinkName);
            while (!MessageQueueZoneSink.this.isShutdown) {
                MessageQueueZoneSink.this.reentrantLock.lock();
                try {
                    try {
                        MessageQueueZoneSink.this.condition.await();
                        MessageQueueZoneSink.this.reentrantLock.unlock();
                        if (MessageQueueZoneSink.this.zoneProducer != null) {
                            do {
                                j = MessageQueueZoneSink.this.lastNotifyTime.get();
                                MessageQueueZoneSink.this.zoneProducer.reloadMetaConfig();
                            } while (j != MessageQueueZoneSink.this.lastNotifyTime.get());
                        }
                    } catch (InterruptedException e) {
                        MessageQueueZoneSink.logger.info("{} config-change processor meet interrupt, break!", MessageQueueZoneSink.this.cachedSinkName);
                        MessageQueueZoneSink.this.reentrantLock.unlock();
                    }
                } catch (Throwable th) {
                    MessageQueueZoneSink.this.reentrantLock.unlock();
                    throw th;
                }
            }
            MessageQueueZoneSink.logger.info("{} config-change processor exit!", MessageQueueZoneSink.this.cachedSinkName);
        }
    }

    public void configure(Context context) {
        this.cachedSinkName = getName();
        logger.info("{} start to configure, context:{}.", this.cachedSinkName, context.toString());
        this.parentContext = context;
    }

    public void start() {
        if (getChannel() == null) {
            logger.error("{}'s channel is null", this.cachedSinkName);
        }
        this.cachedMsgChannel = getChannel();
        try {
            ConfigManager.getInstance().regMetaConfigChgCallback(this);
            this.context = new MessageQueueZoneSinkContext(this, this.parentContext, this.cachedMsgChannel);
            this.context.start();
            this.dispatchManager = new BatchPackManager(this, this.parentContext);
            this.scheduledPool = Executors.newScheduledThreadPool(2);
            this.scheduledPool.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink.1
                @Override // java.lang.Runnable
                public void run() {
                    MessageQueueZoneSink.this.dispatchManager.setNeedOutputOvertimeData();
                    MessageQueueZoneSink.this.zoneProducer.clearExpiredProducers();
                }
            }, this.dispatchManager.getDispatchTimeout(), this.dispatchManager.getDispatchTimeout(), TimeUnit.MILLISECONDS);
            this.zoneProducer = new MessageQueueZoneProducer(this, this.context);
            this.zoneProducer.start();
            this.configListener = new Thread(new ConfigChangeProcessor());
            this.configListener.setName(this.cachedSinkName + "-configure-listener");
            this.configListener.start();
            for (int i = 0; i < this.context.getMaxThreads(); i++) {
                MessageQueueZoneWorker messageQueueZoneWorker = new MessageQueueZoneWorker(this, i, this.context.getProcessInterval(), this.zoneProducer);
                messageQueueZoneWorker.start();
                this.workers.add(messageQueueZoneWorker);
            }
        } catch (Exception e) {
            logger.error("{} start failure", this.cachedSinkName, e);
        }
        super.start();
    }

    public void stop() {
        this.isShutdown = true;
        if (this.configListener != null) {
            try {
                this.configListener.interrupt();
                this.configListener.join();
                this.configListener = null;
            } catch (Throwable th) {
            }
        }
        Iterator<MessageQueueZoneWorker> it = this.workers.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Throwable th2) {
                logger.error("{} stop Zone worker failure", this.cachedSinkName, th2);
            }
        }
        this.context.close();
        super.stop();
    }

    public Sink.Status process() throws EventDeliveryException {
        while (!this.mqClusterStarted) {
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
                return Sink.Status.BACKOFF;
            } catch (Throwable th) {
            }
        }
        this.dispatchManager.outputOvertimeData();
        Transaction transaction = this.cachedMsgChannel.getTransaction();
        transaction.begin();
        try {
            try {
                ProxyEvent take = this.cachedMsgChannel.take();
                if (take == null) {
                    transaction.commit();
                    Sink.Status status = Sink.Status.BACKOFF;
                    transaction.close();
                    return status;
                }
                this.context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_TAKE_SUCCESS);
                if (take instanceof ProxyEvent) {
                    this.dispatchManager.addEvent(take);
                    transaction.commit();
                    Sink.Status status2 = Sink.Status.READY;
                    transaction.close();
                    return status2;
                }
                if (take instanceof ProxyPackEvent) {
                    this.dispatchManager.addPackEvent((ProxyPackEvent) take);
                    transaction.commit();
                    Sink.Status status3 = Sink.Status.READY;
                    transaction.close();
                    return status3;
                }
                if (take instanceof SimpleEvent) {
                    this.dispatchManager.addSimpleEvent((SimpleEvent) take);
                    transaction.commit();
                    Sink.Status status4 = Sink.Status.READY;
                    transaction.close();
                    return status4;
                }
                if (StringUtils.isEmpty((CharSequence) take.getHeaders().get("msgEnType"))) {
                    String str = (String) take.getHeaders().get("inlongGroupId");
                    String str2 = (String) take.getHeaders().get("inlongStreamId");
                    String str3 = (String) take.getHeaders().get("msgTime");
                    String str4 = (String) take.getHeaders().get(Constants.HEADER_KEY_SOURCE_IP);
                    String str5 = (String) take.getHeaders().get(Constants.HEADER_KEY_SOURCE_TIME);
                    if (str == null || str2 == null || str3 == null || str4 == null || str5 == null) {
                        this.context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V1_MALFORMED);
                    } else {
                        this.dispatchManager.addEvent(new ProxyEvent(str, str2, str3, str4, str5, take.getHeaders(), take.getBody()));
                        this.context.fileMetricIncSumStats(StatConstants.EVENT_SINK_FILE_V1_TAKE_SUCCESS);
                    }
                } else {
                    SimpleEvent simpleEvent = new SimpleEvent();
                    simpleEvent.setBody(take.getBody());
                    simpleEvent.setHeaders(take.getHeaders());
                    this.dispatchManager.addSimpleEvent(simpleEvent);
                    this.context.fileMetricIncSumStats(StatConstants.EVENT_SINK_FILE_V0_TAKE_SUCCESS);
                }
                transaction.commit();
                Sink.Status status5 = Sink.Status.READY;
                transaction.close();
                return status5;
            } catch (Throwable th2) {
                this.context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_TAKE_FAILURE);
                if (logCounter.shouldPrint()) {
                    logger.error("{} process event failed!", this.cachedSinkName, th2);
                }
                try {
                    transaction.rollback();
                } catch (Throwable th3) {
                    if (logCounter.shouldPrint()) {
                        logger.error("{} channel take transaction rollback exception", this.cachedSinkName, th3);
                    }
                }
                Sink.Status status6 = Sink.Status.BACKOFF;
                transaction.close();
                return status6;
            }
        } catch (Throwable th4) {
            transaction.close();
            throw th4;
        }
    }

    public String getCachedSinkName() {
        return this.cachedSinkName;
    }

    public boolean isMqClusterStarted() {
        return this.mqClusterStarted;
    }

    public void setMQClusterStarted() {
        this.mqClusterStarted = true;
    }

    public void acquireAndOfferDispatchedRecord(PackProfile packProfile) {
        this.dispatchQueue.acquire(packProfile.getSize());
        this.dispatchQueue.offer(packProfile);
    }

    public void offerDispatchRecord(PackProfile packProfile) {
        this.dispatchQueue.offer(packProfile);
    }

    public PackProfile pollDispatchedRecord() {
        return this.dispatchQueue.pollRecord();
    }

    public PackProfile takeDispatchedRecord() {
        return this.dispatchQueue.takeRecord();
    }

    public void releaseAcquiredSizePermit(PackProfile packProfile) {
        this.dispatchQueue.release(packProfile.getSize());
    }

    public int getDispatchQueueSize() {
        return this.dispatchQueue.size();
    }

    public int getDispatchAvailablePermits() {
        return this.dispatchQueue.availablePermits();
    }

    @Override // org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback
    public void update() {
        if (this.zoneProducer == null) {
            return;
        }
        this.reentrantLock.lock();
        try {
            this.lastNotifyTime.set(System.currentTimeMillis());
            this.condition.signal();
        } finally {
            this.reentrantLock.unlock();
        }
    }
}
