package org.apache.inlong.audit.sink;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.inlong.audit.base.HighPriorityThreadFactory;
import org.apache.inlong.audit.file.ConfigManager;
import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/audit/sink/KafkaSink.class */
public class KafkaSink extends AbstractSink implements Configurable {
    private String kafkaServerUrl;
    private static final String BOOTSTRAP_SERVER = "bootstrap_servers";
    private static final String TOPIC = "topic";
    private static final String RETRIES = "retries";
    private static final String BATCH_SIZE = "batch_size";
    private static final String LINGER_MS = "linger_ms";
    private static final String BUFFER_MEMORY = "buffer_memory";
    private static final String defaultRetries = "0";
    private static final String defaultBatchSize = "16384";
    private static final String defaultLingerMs = "0";
    private static final String defaultBufferMemory = "33554432";
    private static final String defaultAcks = "all";
    private KafkaProducer<String, byte[]> producer;
    public Map<String, KafkaProducer<String, byte[]>> producerMap;
    private SinkCounter sinkCounter;
    private String topic;
    private int threadNum;
    private Thread[] sinkThreadPool;
    private static final int BAD_EVENT_QUEUE_SIZE = 10000;
    private static final int EVENT_QUEUE_SIZE = 1000;
    private static final int DEFAULT_LOG_EVERY_N_EVENTS = 100000;
    private LinkedBlockingQueue<EventStat> resendQueue;
    private LinkedBlockingQueue<Event> eventQueue;
    private Integer logEveryNEvents;
    private long diskIORatePerSec;
    private RateLimiter diskRateLimiter;
    private static final String LOG_EVERY_N_EVENTS = "log_every_n_events";
    private static final String DISK_IO_RATE_PER_SEC = "disk_io_rate_per_sec";
    private static final String SINK_THREAD_NUM = "thread-num";
    private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
    private static Properties properties = new Properties();
    private static final Long PRINT_INTERVAL = 30L;
    private static final KafkaPerformanceTask kafkaPerformanceTask = new KafkaPerformanceTask();
    private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1, new HighPriorityThreadFactory("kafkaPerformance-Printer-thread"));
    private static AtomicLong totalKafkaSuccSendCnt = new AtomicLong(0);
    private static AtomicLong totalKafkaSuccSendSize = new AtomicLong(0);
    private volatile boolean canSend = false;
    private volatile boolean canTake = false;
    private AtomicLong currentSuccessSendCnt = new AtomicLong(0);
    private AtomicLong lastSuccessSendCnt = new AtomicLong(0);
    private long t1 = System.currentTimeMillis();
    private long t2 = 0;
    private boolean overflow = false;
    private String localIp = "127.0.0.1";

    /* loaded from: input_file:org/apache/inlong/audit/sink/KafkaSink$KafkaPerformanceTask.class */
    static class KafkaPerformanceTask implements Runnable {
        KafkaPerformanceTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (KafkaSink.totalKafkaSuccSendSize.get() != 0) {
                    KafkaSink.logger.info("Total kafka performance tps: " + (KafkaSink.totalKafkaSuccSendCnt.get() / KafkaSink.PRINT_INTERVAL.longValue()) + "/s, avg msg size: " + (KafkaSink.totalKafkaSuccSendSize.get() / KafkaSink.totalKafkaSuccSendCnt.get()) + ", print every " + KafkaSink.PRINT_INTERVAL + " seconds");
                    KafkaSink.totalKafkaSuccSendSize.set(0L);
                    KafkaSink.totalKafkaSuccSendCnt.set(0L);
                }
            } catch (Exception e) {
                KafkaSink.logger.info("tubePerformanceTask error", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/inlong/audit/sink/KafkaSink$SinkTask.class */
    class SinkTask implements Runnable {
        SinkTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            EventStat eventStat;
            KafkaSink.logger.info("Sink task {} started.", Thread.currentThread().getName());
            while (KafkaSink.this.canSend) {
                Event event = null;
                try {
                    if (KafkaSink.this.overflow) {
                        KafkaSink.this.overflow = false;
                        Thread.sleep(10L);
                    }
                    if (KafkaSink.this.resendQueue.isEmpty()) {
                        event = (Event) KafkaSink.this.eventQueue.take();
                        eventStat = new EventStat(event);
                        KafkaSink.this.sinkCounter.incrementEventDrainAttemptCount();
                    } else {
                        eventStat = (EventStat) KafkaSink.this.resendQueue.poll();
                        if (eventStat != null) {
                            event = eventStat.getEvent();
                        }
                    }
                    if (event == null || StringUtils.isBlank(KafkaSink.this.topic)) {
                        KafkaSink.logger.warn("event is null or no topic specified in event header, just skip");
                    } else {
                        if (!sendMessage(event, KafkaSink.this.topic, eventStat)) {
                        }
                    }
                } catch (InterruptedException e) {
                    KafkaSink.logger.info("Thread {} has been interrupted!", Thread.currentThread().getName());
                    return;
                } catch (Throwable th) {
                    if (th instanceof PulsarClientException) {
                        String message = th.getMessage();
                        if (message == null || !(message.contains("No available queue for topic") || message.contains("The brokers of topic are all forbidden"))) {
                            try {
                                Thread.sleep(100L);
                            } catch (InterruptedException e2) {
                            }
                        } else {
                            KafkaSink.logger.info("IllegalTopicMap.put " + KafkaSink.this.topic);
                        }
                    }
                    KafkaSink.this.resendEvent(null, false);
                }
            }
        }

        private boolean sendMessage(Event event, String str, EventStat eventStat) {
            KafkaProducer producer = KafkaSink.this.getProducer(str);
            if (producer == null) {
                KafkaSink.logger.error("Get producer is null, topic:{}", str);
                return false;
            }
            KafkaSink.logger.debug("producer start to send msg...");
            producer.send(new ProducerRecord(str, event.getBody()), (recordMetadata, exc) -> {
                if (exc == null) {
                    KafkaSink.this.handleMessageSendSuccess(eventStat);
                    return;
                }
                KafkaSink.logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}", new Object[]{exc.getMessage(), Integer.valueOf(KafkaSink.this.resendQueue.size()), Integer.valueOf(eventStat.getEvent().hashCode())});
                eventStat.incRetryCnt();
                KafkaSink.this.resendEvent(eventStat, true);
            });
            return true;
        }
    }

    public KafkaSink() {
        logger.debug("new instance of KafkaSink!");
    }

    public synchronized void start() {
        logger.info("kafka sink starting");
        this.sinkCounter.start();
        super.start();
        this.canSend = true;
        this.canTake = true;
        initTopicProducer(this.topic);
        for (int i = 0; i < this.sinkThreadPool.length; i++) {
            this.sinkThreadPool[i] = new Thread(new SinkTask(), getName() + "_tube_sink_sender-" + i);
            this.sinkThreadPool[i].start();
        }
        logger.debug("kafka sink started");
    }

    public synchronized void stop() {
        logger.info("kafka sink stopping");
        this.canTake = false;
        int i = 0;
        while (this.eventQueue.size() != 0) {
            int i2 = i;
            i++;
            if (i2 >= 10) {
                break;
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                logger.info("Stop thread has been interrupt!");
            }
        }
        this.canSend = false;
        if (this.sinkThreadPool != null) {
            for (Thread thread : this.sinkThreadPool) {
                if (thread != null) {
                    thread.interrupt();
                }
            }
            this.sinkThreadPool = null;
        }
        super.stop();
        if (!scheduledExecutorService.isShutdown()) {
            scheduledExecutorService.shutdown();
        }
        this.sinkCounter.stop();
        logger.debug("kafka sink stopped. Metrics:{}", this.sinkCounter);
    }

    public Sink.Status process() {
        logger.info("kafka sink processing");
        if (!this.canTake) {
            return Sink.Status.BACKOFF;
        }
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            try {
                Event take = channel.take();
                if (take != null) {
                    if (this.diskRateLimiter != null) {
                        this.diskRateLimiter.acquire(take.getBody().length);
                    }
                    if (this.eventQueue.offer(take, 3000L, TimeUnit.MILLISECONDS)) {
                        transaction.commit();
                    } else {
                        logger.info("[{}] Channel --> Queue(not enough space, current code point) --> Kafka, check if Kafka server or network is ok. (If this situation last long time it will cause memoryChannel full and fileChannel write.)", getName());
                        transaction.rollback();
                    }
                } else {
                    status = Sink.Status.BACKOFF;
                    transaction.commit();
                }
                transaction.close();
            } catch (Throwable th) {
                logger.error("Process event failed!" + getName(), th);
                try {
                    transaction.rollback();
                } catch (Throwable th2) {
                    logger.error("Kafka Sink transaction rollback exception = {}", th2);
                }
                transaction.close();
            }
            return status;
        } catch (Throwable th3) {
            transaction.close();
            throw th3;
        }
    }

    public void configure(Context context) {
        logger.info("KafkaSink started and context = {}", context.toString());
        this.topic = context.getString(TOPIC);
        Preconditions.checkState(StringUtils.isNotEmpty(this.topic), "No topic specified");
        this.producerMap = new HashMap();
        this.logEveryNEvents = context.getInteger(LOG_EVERY_N_EVENTS, Integer.valueOf(DEFAULT_LOG_EVERY_N_EVENTS));
        logger.debug(getName() + " " + LOG_EVERY_N_EVENTS + " " + this.logEveryNEvents);
        Preconditions.checkArgument(this.logEveryNEvents.intValue() > 0, "logEveryNEvents must be > 0");
        this.resendQueue = new LinkedBlockingQueue<>(BAD_EVENT_QUEUE_SIZE);
        this.threadNum = Integer.parseInt(context.getString(SINK_THREAD_NUM, "4"));
        Preconditions.checkArgument(this.threadNum > 0, "threadNum must be > 0");
        this.sinkThreadPool = new Thread[this.threadNum];
        this.eventQueue = new LinkedBlockingQueue<>(EVENT_QUEUE_SIZE);
        this.diskIORatePerSec = context.getLong(DISK_IO_RATE_PER_SEC, 0L).longValue();
        if (this.diskIORatePerSec != 0) {
            this.diskRateLimiter = RateLimiter.create(this.diskIORatePerSec);
        }
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(getName());
        }
        this.localIp = NetworkUtils.getLocalIp();
        properties = new Properties();
        properties.put("acks", defaultAcks);
        properties.put(RETRIES, context.getString(RETRIES, "0"));
        properties.put("batch.size", context.getString(BATCH_SIZE, defaultBatchSize));
        properties.put("linger.ms", context.getString(LINGER_MS, "0"));
        properties.put("buffer.memory", context.getString(BUFFER_MEMORY, defaultBufferMemory));
    }

    private void initTopicProducer(String str) {
        ConfigManager.getInstance().getMqInfoList().forEach(mQInfo -> {
            if ("KAFKA".equals(mQInfo.getMqType())) {
                this.kafkaServerUrl = mQInfo.getUrl();
            }
        });
        properties.put("bootstrap.servers", this.kafkaServerUrl);
        if (StringUtils.isEmpty(str)) {
            logger.error("topic is empty");
        }
        if (this.producer == null) {
            this.producer = new KafkaProducer<>(properties, new StringSerializer(), new ByteArraySerializer());
        }
        this.producerMap.put(str, this.producer);
        logger.info(getName() + " success create producer");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public KafkaProducer<String, byte[]> getProducer(String str) {
        if (!this.producerMap.containsKey(str)) {
            synchronized (this) {
                if (!this.producerMap.containsKey(str)) {
                    if (this.producer == null) {
                        this.producer = new KafkaProducer<>(properties);
                    }
                    this.producerMap.put(str, this.producer);
                }
            }
        }
        return this.producerMap.get(str);
    }

    public void handleMessageSendSuccess(EventStat eventStat) {
        totalKafkaSuccSendCnt.incrementAndGet();
        totalKafkaSuccSendSize.addAndGet(eventStat.getEvent().getBody().length);
        this.sinkCounter.incrementEventDrainSuccessCount();
        this.currentSuccessSendCnt.incrementAndGet();
        long j = this.currentSuccessSendCnt.get();
        long j2 = this.lastSuccessSendCnt.get();
        if (j % this.logEveryNEvents.intValue() != 0 || j == this.lastSuccessSendCnt.get()) {
            return;
        }
        this.lastSuccessSendCnt.set(j);
        this.t2 = System.currentTimeMillis();
        logger.info("KafkaSink {}, succ put {} events to kafka, in the past {} millisecond", new Object[]{getName(), Long.valueOf(j - j2), Long.valueOf(this.t2 - this.t1)});
        this.t1 = this.t2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resendEvent(EventStat eventStat, boolean z) {
        if (eventStat != null) {
            try {
                if (eventStat.getEvent() == null) {
                    return;
                }
                if (!this.resendQueue.offer(eventStat)) {
                    FailoverChannelProcessorHolder.getChannelProcessor().processEvent(eventStat.getEvent());
                }
            } catch (Throwable th) {
                logger.error("resendEvent e = {}", th);
            }
        }
    }

    static {
        logger.info("init kafkaPerformanceTask");
        scheduledExecutorService.scheduleWithFixedDelay(kafkaPerformanceTask, 0L, PRINT_INTERVAL.longValue(), TimeUnit.SECONDS);
    }
}
