package org.apache.flume.sink.tubemq;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corerpc.exception.OverflowException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/tubemq/TubemqSink.class */
public class TubemqSink extends AbstractSink implements Configurable {
    private static final Logger LOGGER = LoggerFactory.getLogger(TubemqSink.class);
    public TubeMultiSessionFactory sessionFactory;
    public ConcurrentHashMap<String, MessageProducer> producerMap;
    private String masterHostAndPortList;
    private String defaultTopic;
    private long heartbeatPeriod;
    private long rpcTimeout;
    private long linkMaxAllowedDelayedMsgCount;
    private long sessionWarnDelayedMsgCount;
    private long sessionMaxAllowedDelayedMsgCount;
    private long nettyWriteBufferHighWaterMark;
    private ExecutorService sinkThreadPool;
    private int threadNum;
    private LinkedBlockingQueue<EventStat> resendQueue;
    private LinkedBlockingQueue<Event> eventQueue;
    private int maxRetryTime;
    private long eventOfferTimeout;
    private TubeClientConfig clientConfig;
    private TubeSinkCounter counter;
    private final List<Future<?>> threadFutures = new ArrayList();
    private boolean started = false;
    private boolean overflow = false;

    /* loaded from: input_file:org/apache/flume/sink/tubemq/TubemqSink$SinkTask.class */
    class SinkTask implements Runnable {
        SinkTask() {
        }

        private void sleepIfOverflow() throws Exception {
            if (TubemqSink.this.overflow) {
                TubemqSink.this.overflow = false;
                Thread.sleep(50L);
            }
        }

        private EventStat fetchEventStat() throws Exception {
            return !TubemqSink.this.resendQueue.isEmpty() ? (EventStat) TubemqSink.this.resendQueue.poll() : new EventStat((Event) TubemqSink.this.eventQueue.take());
        }

        private void sendEvent(MessageProducer messageProducer, final EventStat eventStat) throws Exception {
            messageProducer.sendMessage(new Message(eventStat.getTopic(), eventStat.getEvent().getBody()), new MessageSentCallback() { // from class: org.apache.flume.sink.tubemq.TubemqSink.SinkTask.1
                public void onMessageSent(MessageSentResult messageSentResult) {
                    if (messageSentResult.isSuccess()) {
                        return;
                    }
                    SinkTask.this.resendEvent(eventStat);
                }

                public void onException(Throwable th) {
                    TubemqSink.LOGGER.error("exception caught", th);
                    if (th instanceof OverflowException) {
                        TubemqSink.this.overflow = true;
                    }
                    SinkTask.this.resendEvent(eventStat);
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void resendEvent(EventStat eventStat) {
            if (eventStat == null || eventStat.getEvent() == null) {
                return;
            }
            eventStat.incRetryCnt();
            if (eventStat.getRetryCnt() > TubemqSink.this.maxRetryTime) {
                TubemqSink.LOGGER.error("event max retry reached, ignore it");
            } else {
                if (TubemqSink.this.resendQueue.offer(eventStat)) {
                    return;
                }
                TubemqSink.this.getChannel().put(eventStat.getEvent());
                TubemqSink.LOGGER.warn("resend queue is full, size: {}, send back to channel", Integer.valueOf(TubemqSink.this.resendQueue.size()));
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            TubemqSink.LOGGER.info("Sink task {} started.", Thread.currentThread().getName());
            while (TubemqSink.this.started) {
                EventStat eventStat = null;
                try {
                    sleepIfOverflow();
                    eventStat = fetchEventStat();
                    if (eventStat.getTopic() == null || eventStat.getTopic().equals("")) {
                        TubemqSink.LOGGER.debug("no topic specified in event header, use default topic instead");
                        eventStat.setTopic(TubemqSink.this.defaultTopic);
                    }
                    TubemqSink.this.counter.incrementSendCount();
                    try {
                        sendEvent(TubemqSink.this.getProducer(eventStat.getTopic()), eventStat);
                    } catch (Exception e) {
                        TubemqSink.LOGGER.error("Get producer failed!", e);
                    }
                } catch (InterruptedException e2) {
                    TubemqSink.LOGGER.info("Thread {} has been interrupted!", Thread.currentThread().getName());
                    return;
                } catch (Throwable th) {
                    TubemqSink.LOGGER.error("error while sending event", th);
                    resendEvent(eventStat);
                }
            }
        }
    }

    private TubeClientConfig initTubeConfig() {
        TubeClientConfig tubeClientConfig = new TubeClientConfig(this.masterHostAndPortList);
        tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(this.linkMaxAllowedDelayedMsgCount);
        tubeClientConfig.setSessionWarnDelayedMsgCount(this.sessionWarnDelayedMsgCount);
        tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(this.sessionMaxAllowedDelayedMsgCount);
        tubeClientConfig.setNettyWriteBufferHighWaterMark(this.nettyWriteBufferHighWaterMark);
        tubeClientConfig.setHeartbeatPeriodMs(this.heartbeatPeriod);
        tubeClientConfig.setRpcTimeoutMs(this.rpcTimeout);
        return tubeClientConfig;
    }

    @VisibleForTesting
    TubeClientConfig getClientConfig() {
        return this.clientConfig;
    }

    @VisibleForTesting
    TubeSinkCounter getCounter() {
        return this.counter;
    }

    private void createConnection() throws FlumeException {
        if (this.sessionFactory != null) {
            return;
        }
        try {
            this.sessionFactory = new TubeMultiSessionFactory(this.clientConfig);
            if (this.producerMap == null) {
                this.producerMap = new ConcurrentHashMap<>();
            }
        } catch (TubeClientException e) {
            LOGGER.error("create connection error in tubemqSink, maybe tubemq master set error, please re-check. ex1 {}", e.getMessage());
            throw new FlumeException("connect to tubemq error1, please re-check", e);
        }
    }

    private void destroyConnection() {
        Iterator it = this.producerMap.keySet().iterator();
        while (it.hasNext()) {
            try {
                this.producerMap.get((String) it.next()).shutdown();
            } catch (Throwable th) {
                LOGGER.error("destroy producer error in tubemqSink, ex", th);
            }
        }
        this.producerMap.clear();
        if (this.sessionFactory != null) {
            try {
                this.sessionFactory.shutdown();
            } catch (Exception e) {
                LOGGER.error("destroy sessionFactory error in tubemqSink, MetaClientException", e);
            }
        }
        this.sessionFactory = null;
        LOGGER.debug("closed meta producer");
    }

    public void start() {
        LOGGER.info("tubemq sink starting...");
        try {
            createConnection();
        } catch (FlumeException e) {
            destroyConnection();
            LOGGER.error("Unable to create tubemq client. Exception follows.", e);
        }
        this.started = true;
        for (int i = 0; i < this.threadNum; i++) {
            this.threadFutures.add(this.sinkThreadPool.submit(new SinkTask()));
        }
        super.start();
    }

    public void stop() {
        LOGGER.info("tubemq sink stopping");
        this.started = false;
        if (this.sinkThreadPool != null) {
            this.sinkThreadPool.shutdown();
        }
        Iterator<Future<?>> it = this.threadFutures.iterator();
        while (it.hasNext()) {
            it.next().cancel(true);
        }
        this.threadFutures.clear();
        destroyConnection();
        super.stop();
    }

    public Sink.Status process() {
        if (!this.started) {
            return Sink.Status.BACKOFF;
        }
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        Sink.Status status = Sink.Status.READY;
        try {
            try {
                Event take = channel.take();
                if (take == null) {
                    status = Sink.Status.BACKOFF;
                    transaction.commit();
                } else if (this.eventQueue.offer(take, this.eventOfferTimeout, TimeUnit.MILLISECONDS)) {
                    transaction.commit();
                } else {
                    LOGGER.info("[{}] Channel --> Queue(has no enough space,current code point) --> tubemq, Check if tubemq server or network is ok.(if this situation last long time it will cause memoryChannel full and fileChannel write.)", getName());
                    this.counter.incrementRollbackCount();
                    transaction.rollback();
                }
                transaction.close();
            } catch (Throwable th) {
                LOGGER.error("Process event failed!" + getName(), th);
                try {
                    this.counter.incrementRollbackCount();
                    transaction.rollback();
                } catch (Throwable th2) {
                    LOGGER.error("tubemq sink transaction rollback exception", th2);
                }
                transaction.close();
            }
            return status;
        } catch (Throwable th3) {
            transaction.close();
            throw th3;
        }
    }

    public void configure(Context context) {
        LOGGER.info(context.toString());
        this.masterHostAndPortList = context.getString(ConfigOptions.MASTER_HOST_PORT_LIST);
        this.defaultTopic = context.getString(ConfigOptions.TOPIC);
        this.heartbeatPeriod = context.getLong(ConfigOptions.HEARTBEAT_PERIOD, Long.valueOf(ConfigOptions.DEFAULT_HEARTBEAT_PERIOD)).longValue();
        this.rpcTimeout = context.getLong(ConfigOptions.RPC_TIMEOUT, Long.valueOf(ConfigOptions.DEFAULT_RPC_TIMEOUT)).longValue();
        this.linkMaxAllowedDelayedMsgCount = context.getLong(ConfigOptions.LINK_MAX_ALLOWED_DELAYED_MSG_COUNT, Long.valueOf(ConfigOptions.DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT)).longValue();
        this.sessionWarnDelayedMsgCount = context.getLong(ConfigOptions.SESSION_WARN_DELAYED_MSG_COUNT, Long.valueOf(ConfigOptions.DEFAULT_SESSION_WARN_DELAYED_MSG_COUNT)).longValue();
        this.sessionMaxAllowedDelayedMsgCount = context.getLong(ConfigOptions.SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT, Long.valueOf(ConfigOptions.DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT)).longValue();
        this.nettyWriteBufferHighWaterMark = context.getLong(ConfigOptions.NETTY_WRITE_BUFFER_HIGH_WATER_MARK, Long.valueOf(ConfigOptions.DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK)).longValue();
        this.producerMap = new ConcurrentHashMap<>();
        this.threadNum = context.getInteger(ConfigOptions.SINK_THREAD_NUM, 4).intValue();
        this.sinkThreadPool = Executors.newFixedThreadPool(this.threadNum);
        this.resendQueue = new LinkedBlockingQueue<>(context.getInteger("retry-queue-capacity", Integer.valueOf(ConfigOptions.DEFAULT_RETRY_QUEUE_CAPACITY)).intValue());
        this.eventQueue = new LinkedBlockingQueue<>(context.getInteger("retry-queue-capacity", Integer.valueOf(ConfigOptions.DEFAULT_EVENT_QUEUE_CAPACITY)).intValue());
        this.maxRetryTime = context.getInteger(ConfigOptions.EVENT_MAX_RETRY_TIME, 5).intValue();
        this.eventOfferTimeout = context.getLong(ConfigOptions.EVENT_OFFER_TIMEOUT, Long.valueOf(ConfigOptions.DEFAULT_EVENT_OFFER_TIMEOUT)).longValue();
        this.counter = new TubeSinkCounter(getName());
        this.clientConfig = initTubeConfig();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageProducer getProducer(String str) throws TubeClientException {
        if (!this.producerMap.containsKey(str)) {
            MessageProducer createProducer = this.sessionFactory.createProducer();
            createProducer.publish(str);
            this.producerMap.putIfAbsent(str, createProducer);
        }
        return this.producerMap.get(str);
    }
}
