package org.apache.inlong.audit.sink;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.inlong.audit.base.HighPriorityThreadFactory;
import org.apache.inlong.audit.file.ConfigManager;
import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corerpc.exception.OverflowException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/audit/sink/TubeSink.class */
public class TubeSink extends AbstractSink implements Configurable {
    private static final String SINK_THREAD_NUM = "thread-num";
    private static final int defaultLogEveryNEvents = 100000;
    private static final int defaultSendTimeout = 20000;
    private static final int BAD_EVENT_QUEUE_SIZE = 10000;
    private static final int EVENT_QUEUE_SIZE = 1000;
    private static final String MASTER_HOST_PORT_LIST = "master-host-port-list";
    private static final String TOPIC = "topic";
    private static final String SEND_TIMEOUT = "send_timeout";
    private static final String LOG_EVERY_N_EVENTS = "log-every-n-events";
    private static final String RETRY_CNT = "retry-currentSuccSendedCnt";
    public MessageProducer producer;
    public Map<String, MessageProducer> producerMap;
    public TubeMultiSessionFactory sessionFactory;
    private SinkCounter sinkCounter;
    private String topic;
    private LinkedBlockingQueue<EventStat> resendQueue;
    private LinkedBlockingQueue<Event> eventQueue;
    private long diskIORatePerSec;
    private RateLimiter diskRateLimiter;
    private String masterHostAndPortList;
    private Integer logEveryNEvents;
    private Integer sendTimeout;
    private int threadNum;
    private Thread[] sinkThreadPool;
    private long linkMaxAllowedDelayedMsgCount;
    private long sessionWarnDelayedMsgCount;
    private long sessionMaxAllowedDelayedMsgCount;
    private long nettyWriteBufferHighWaterMark;
    private int recoverthreadcount;
    private static final Logger logger = LoggerFactory.getLogger(TubeSink.class);
    private static final Long PRINT_INTERVAL = 30L;
    private static final TubePerformanceTask tubePerformanceTask = new TubePerformanceTask();
    private static final int defaultRetryCnt = -1;
    private static int retryCnt = defaultRetryCnt;
    private static AtomicLong totalTubeSuccSendCnt = new AtomicLong(0);
    private static AtomicLong totalTubeSuccSendSize = new AtomicLong(0);
    private static ConcurrentHashMap<String, Long> illegalTopicMap = new ConcurrentHashMap<>();
    private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1, new HighPriorityThreadFactory("tubePerformance-Printer-thread"));
    private volatile boolean canTake = false;
    private volatile boolean canSend = false;
    private boolean overflow = false;
    private AtomicLong currentSuccessSendCnt = new AtomicLong(0);
    private AtomicLong lastSuccessSendCnt = new AtomicLong(0);
    private long t1 = System.currentTimeMillis();
    private long t2 = 0;
    private String localIp = "127.0.0.1";

    /* loaded from: input_file:org/apache/inlong/audit/sink/TubeSink$MyCallback.class */
    public class MyCallback implements MessageSentCallback {
        private EventStat myEventStat;
        private long sendTime = System.currentTimeMillis();

        public MyCallback(EventStat eventStat) {
            this.myEventStat = eventStat;
        }

        public void onMessageSent(MessageSentResult messageSentResult) {
            if (messageSentResult.isSuccess()) {
                TubeSink.this.handleMessageSendSuccess(this.myEventStat);
                return;
            }
            if (messageSentResult.getErrCode() == 403) {
                TubeSink.logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}", new Object[]{messageSentResult.getErrMsg(), Integer.valueOf(TubeSink.this.resendQueue.size()), Integer.valueOf(this.myEventStat.getEvent().hashCode())});
                return;
            }
            if (messageSentResult.getErrCode() != 419) {
                TubeSink.logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}", new Object[]{messageSentResult.getErrMsg(), Integer.valueOf(TubeSink.this.resendQueue.size()), Integer.valueOf(this.myEventStat.getEvent().hashCode())});
            }
            this.myEventStat.incRetryCnt();
            TubeSink.this.resendEvent(this.myEventStat, true);
        }

        public void onException(Throwable th) {
            Throwable th2;
            Throwable th3 = th;
            while (true) {
                th2 = th3;
                if (th2.getCause() == null) {
                    break;
                } else {
                    th3 = th2.getCause();
                }
            }
            if (th2 instanceof OverflowException) {
                TubeSink.this.overflow = true;
            }
            this.myEventStat.incRetryCnt();
            TubeSink.this.resendEvent(this.myEventStat, true);
        }
    }

    /* loaded from: input_file:org/apache/inlong/audit/sink/TubeSink$SinkTask.class */
    class SinkTask implements Runnable {
        SinkTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            EventStat eventStat;
            TubeSink.logger.info("Sink task {} started.", Thread.currentThread().getName());
            while (TubeSink.this.canSend) {
                Event event = null;
                String str = null;
                try {
                    if (TubeSink.this.overflow) {
                        TubeSink.this.overflow = false;
                        Thread.sleep(10L);
                    }
                    if (TubeSink.this.resendQueue.isEmpty()) {
                        event = (Event) TubeSink.this.eventQueue.take();
                        eventStat = new EventStat(event);
                        if (event.getHeaders().containsKey(TubeSink.TOPIC)) {
                            str = (String) event.getHeaders().get(TubeSink.TOPIC);
                        }
                    } else {
                        eventStat = (EventStat) TubeSink.this.resendQueue.poll();
                        if (eventStat != null) {
                            event = eventStat.getEvent();
                            if (event.getHeaders().containsKey(TubeSink.TOPIC)) {
                                str = (String) event.getHeaders().get(TubeSink.TOPIC);
                            }
                        }
                    }
                    if (event != null) {
                        if (str == null || str.equals("")) {
                            TubeSink.logger.warn("no topic specified in event header, just skip this event");
                        } else {
                            Long l = (Long) TubeSink.illegalTopicMap.get(str);
                            if (l != null) {
                                if (l.longValue() <= System.currentTimeMillis()) {
                                    TubeSink.illegalTopicMap.remove(str);
                                }
                            }
                            if (!sendMessage(event, str, eventStat)) {
                            }
                        }
                    }
                } catch (InterruptedException e) {
                    TubeSink.logger.info("Thread {} has been interrupted!", Thread.currentThread().getName());
                    return;
                } catch (Throwable th) {
                    if (th instanceof TubeClientException) {
                        String message = th.getMessage();
                        if (message == null || !(message.contains("No available queue for topic") || message.contains("The brokers of topic are all forbidden"))) {
                            try {
                                Thread.sleep(100L);
                            } catch (InterruptedException e2) {
                            }
                        } else {
                            TubeSink.illegalTopicMap.put(null, Long.valueOf(System.currentTimeMillis() + 60000));
                            TubeSink.logger.info("IllegalTopicMap.put " + ((String) null));
                        }
                    }
                    TubeSink.this.resendEvent(null, false);
                }
            }
        }

        private boolean sendMessage(Event event, String str, EventStat eventStat) throws TubeClientException, InterruptedException {
            MessageProducer producer = TubeSink.this.getProducer(str);
            if (producer == null) {
                TubeSink.illegalTopicMap.put(str, Long.valueOf(System.currentTimeMillis() + 30000));
                TubeSink.logger.error("Get producer is null, topic:{}", str);
                return false;
            }
            Message message = new Message(str, event.getBody());
            message.setAttrKeyVal("auditIp", TubeSink.this.localIp);
            if (event.getHeaders().containsKey("inlongStreamId")) {
                message.setAttrKeyVal("inlongStreamId", (String) event.getHeaders().get("inlongStreamId"));
            }
            if (event.getHeaders().containsKey("inlongGroupId")) {
                message.setAttrKeyVal("inlongGroupId", (String) event.getHeaders().get("inlongGroupId"));
            }
            TubeSink.logger.debug("producer start to send msg...");
            producer.sendMessage(message, new MyCallback(eventStat));
            TubeSink.illegalTopicMap.remove(str);
            return true;
        }
    }

    /* loaded from: input_file:org/apache/inlong/audit/sink/TubeSink$TubePerformanceTask.class */
    static class TubePerformanceTask implements Runnable {
        TubePerformanceTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (TubeSink.totalTubeSuccSendSize.get() != 0) {
                    TubeSink.logger.info("Total tube performance tps :" + (TubeSink.totalTubeSuccSendCnt.get() / TubeSink.PRINT_INTERVAL.longValue()) + "/s, avg msg size:" + (TubeSink.totalTubeSuccSendSize.get() / TubeSink.totalTubeSuccSendCnt.get()) + ",print every " + TubeSink.PRINT_INTERVAL + " seconds");
                    TubeSink.totalTubeSuccSendSize.set(0L);
                    TubeSink.totalTubeSuccSendCnt.set(0L);
                }
            } catch (Exception e) {
                TubeSink.logger.info("tubePerformanceTask error", e);
            }
        }
    }

    public TubeSink() {
        logger.debug("new instance of TubeSink!");
    }

    public synchronized void start() {
        logger.info("tube sink starting");
        try {
            createConnection();
            this.sinkCounter.start();
            super.start();
            this.canSend = true;
            this.canTake = true;
            try {
                initTopicProducer(this.topic);
            } catch (Exception e) {
                logger.error("tubesink start publish topic fail.", e);
            }
            for (int i = 0; i < this.sinkThreadPool.length; i++) {
                this.sinkThreadPool[i] = new Thread(new SinkTask(), getName() + "_tube_sink_sender-" + i);
                this.sinkThreadPool[i].start();
            }
            logger.debug("tubesink started");
        } catch (FlumeException e2) {
            logger.error("Unable to create tube client. Exception follows.", e2);
            stop();
        }
    }

    public synchronized void stop() {
        logger.info("tube sink stopping");
        destroyConnection();
        this.canTake = false;
        int i = 0;
        while (this.eventQueue.size() != 0) {
            int i2 = i;
            i++;
            if (i2 >= 10) {
                break;
            }
            try {
                Thread.currentThread();
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                logger.info("Stop thread has been interrupt!");
            }
        }
        this.canSend = false;
        if (this.sinkThreadPool != null) {
            for (Thread thread : this.sinkThreadPool) {
                if (thread != null) {
                    thread.interrupt();
                }
            }
            this.sinkThreadPool = null;
        }
        super.stop();
        if (!scheduledExecutorService.isShutdown()) {
            scheduledExecutorService.shutdown();
        }
        this.sinkCounter.stop();
        logger.debug("tubesink stopped. Metrics:{}", this.sinkCounter);
    }

    public Sink.Status process() throws EventDeliveryException {
        logger.info("tube sink processing");
        if (!this.canTake) {
            return Sink.Status.BACKOFF;
        }
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            try {
                Event take = channel.take();
                if (take != null) {
                    if (this.diskRateLimiter != null) {
                        this.diskRateLimiter.acquire(take.getBody().length);
                    }
                    if (this.eventQueue.offer(take, 3000L, TimeUnit.MILLISECONDS)) {
                        transaction.commit();
                    } else {
                        logger.info("[{}] Channel --> Queue(has no enough space,current code point) --> Tube,Check if Tube server or network is ok.(if this situation last long time it will cause memoryChannel full and fileChannel write.)", getName());
                        transaction.rollback();
                    }
                } else {
                    status = Sink.Status.BACKOFF;
                    transaction.commit();
                }
                transaction.close();
            } catch (Throwable th) {
                logger.error("Process event failed!" + getName(), th);
                try {
                    transaction.rollback();
                } catch (Throwable th2) {
                    logger.error("tubesink transaction rollback exception", th2);
                }
                transaction.close();
            }
            return status;
        } catch (Throwable th3) {
            transaction.close();
            throw th3;
        }
    }

    public void configure(Context context) {
        logger.info("Tubesink started and context = {}", context.toString());
        this.topic = context.getString(TOPIC);
        Preconditions.checkState(StringUtils.isNotEmpty(this.topic), "No topic specified");
        Preconditions.checkState(this.masterHostAndPortList != null, "No master and port list specified");
        this.producerMap = new HashMap();
        this.logEveryNEvents = context.getInteger(LOG_EVERY_N_EVENTS, Integer.valueOf(defaultLogEveryNEvents));
        logger.debug(getName() + " " + LOG_EVERY_N_EVENTS + " " + this.logEveryNEvents);
        Preconditions.checkArgument(this.logEveryNEvents.intValue() > 0, "logEveryNEvents must be > 0");
        this.sendTimeout = context.getInteger(SEND_TIMEOUT, Integer.valueOf(defaultSendTimeout));
        logger.debug(getName() + " " + SEND_TIMEOUT + " " + this.sendTimeout);
        Preconditions.checkArgument(this.sendTimeout.intValue() > 0, "sendTimeout must be > 0");
        retryCnt = context.getInteger(RETRY_CNT, Integer.valueOf(defaultRetryCnt)).intValue();
        logger.debug(getName() + " " + RETRY_CNT + " " + retryCnt);
        this.localIp = NetworkUtils.getLocalIp();
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(getName());
        }
        this.resendQueue = new LinkedBlockingQueue<>(BAD_EVENT_QUEUE_SIZE);
        this.threadNum = Integer.parseInt(context.getString(SINK_THREAD_NUM, "4"));
        Preconditions.checkArgument(this.threadNum > 0, "threadNum must be > 0");
        this.sinkThreadPool = new Thread[this.threadNum];
        this.eventQueue = new LinkedBlockingQueue<>(EVENT_QUEUE_SIZE);
        this.diskIORatePerSec = context.getLong("disk-io-rate-per-sec", 0L).longValue();
        if (this.diskIORatePerSec != 0) {
            this.diskRateLimiter = RateLimiter.create(this.diskIORatePerSec);
        }
        this.linkMaxAllowedDelayedMsgCount = context.getLong("link_max_allowed_delayed_msg_count", 80000L).longValue();
        this.sessionWarnDelayedMsgCount = context.getLong("session_warn_delayed_msg_count", 2000000L).longValue();
        this.sessionMaxAllowedDelayedMsgCount = context.getLong("session_max_allowed_delayed_msg_count", 4000000L).longValue();
        this.nettyWriteBufferHighWaterMark = context.getLong("netty_write_buffer_high_water_mark", 15728640L).longValue();
        this.recoverthreadcount = context.getInteger("recover_thread_count", Integer.valueOf(Runtime.getRuntime().availableProcessors() + 1)).intValue();
    }

    public void handleMessageSendSuccess(EventStat eventStat) {
        totalTubeSuccSendCnt.incrementAndGet();
        totalTubeSuccSendSize.addAndGet(eventStat.getEvent().getBody().length);
        this.sinkCounter.incrementEventDrainSuccessCount();
        this.currentSuccessSendCnt.incrementAndGet();
        long j = this.currentSuccessSendCnt.get();
        long j2 = this.lastSuccessSendCnt.get();
        if (j % this.logEveryNEvents.intValue() != 0 || j == this.lastSuccessSendCnt.get()) {
            return;
        }
        this.lastSuccessSendCnt.set(j);
        this.t2 = System.currentTimeMillis();
        logger.info("tubesink {}, succ put {} events to tube, in the past {} millsec", new Object[]{getName(), Long.valueOf(j - j2), Long.valueOf(this.t2 - this.t1)});
        this.t1 = this.t2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resendEvent(EventStat eventStat, boolean z) {
        if (eventStat != null) {
            try {
                if (eventStat.getEvent() == null) {
                    return;
                }
                if (!this.resendQueue.offer(eventStat)) {
                    FailoverChannelProcessorHolder.getChannelProcessor().processEvent(eventStat.getEvent());
                }
            } catch (Throwable th) {
                logger.error("resendEvent e = {}", th);
            }
        }
    }

    private void createConnection() throws FlumeException {
        if (this.sessionFactory != null) {
            return;
        }
        try {
            this.sessionFactory = new TubeMultiSessionFactory(initTubeConfig());
            logger.info("create tube connection successfully");
            if (this.producerMap == null) {
                this.producerMap = new HashMap();
            }
        } catch (TubeClientException e) {
            logger.error("create connnection error in tubesink, maybe tube master set error, please re-check. ex1 {}", e.getMessage());
            throw new FlumeException("connect to Tube error1, maybe zkstr/zkroot set error, please re-check");
        } catch (Throwable th) {
            logger.error("create connnection error in tubesink, maybe tube master set error/shutdown in progress, please re-check. ex2 {}", th);
            throw new FlumeException("connect to meta error2, maybe tube master set error/shutdown in progress, please re-check");
        }
    }

    private TubeClientConfig initTubeConfig() throws Exception {
        ConfigManager.getInstance().getMqInfoList().forEach(mQInfo -> {
            if ("TUBEMQ".equals(mQInfo.getMqType())) {
                this.masterHostAndPortList = mQInfo.getUrl();
            }
        });
        TubeClientConfig tubeClientConfig = new TubeClientConfig(this.masterHostAndPortList);
        tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(this.linkMaxAllowedDelayedMsgCount);
        tubeClientConfig.setSessionWarnDelayedMsgCount(this.sessionWarnDelayedMsgCount);
        tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(this.sessionMaxAllowedDelayedMsgCount);
        tubeClientConfig.setNettyWriteBufferHighWaterMark(this.nettyWriteBufferHighWaterMark);
        tubeClientConfig.setHeartbeatPeriodMs(15000L);
        tubeClientConfig.setRpcTimeoutMs(20000L);
        return tubeClientConfig;
    }

    private void destroyConnection() {
        Iterator<Map.Entry<String, MessageProducer>> it = this.producerMap.entrySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().getValue().shutdown();
            } catch (TubeClientException e) {
                logger.error("destroy producer error in tubesink, MetaClientException {}", e.getMessage());
            } catch (Throwable th) {
                logger.error("destroy producer error in tubesink, ex {}", th.getMessage());
            }
        }
        this.producerMap.clear();
        if (this.sessionFactory != null) {
            try {
                this.sessionFactory.shutdown();
            } catch (Exception e2) {
                logger.error("destroy sessionFactory error in tubesink, ex {}", e2.getMessage());
            }
        }
        this.sessionFactory = null;
        logger.debug("closed meta producer");
    }

    private void initTopicProducer(String str) throws TubeClientException {
        if (StringUtils.isEmpty(str)) {
            logger.error("topic is empty");
            return;
        }
        if (this.sessionFactory == null) {
            throw new TubeClientException("sessionFactory is null, can't create producer");
        }
        if (this.producer == null) {
            this.producer = this.sessionFactory.createProducer();
        }
        this.producer.publish(str);
        this.producerMap.put(str, this.producer);
        logger.info(getName() + " success publish topic: " + str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageProducer getProducer(String str) throws TubeClientException {
        if (this.producerMap.containsKey(str)) {
            return this.producerMap.get(str);
        }
        synchronized (this) {
            if (!this.producerMap.containsKey(str)) {
                if (this.producer == null) {
                    this.producer = this.sessionFactory.createProducer();
                }
                this.producer.publish(str);
                this.producerMap.put(str, this.producer);
            }
        }
        return this.producerMap.get(str);
    }

    static {
        logger.info("init tubePerformanceTask");
        scheduledExecutorService.scheduleWithFixedDelay(tubePerformanceTask, 0L, PRINT_INTERVAL.longValue(), TimeUnit.SECONDS);
    }
}
