package org.apache.inlong.dataproxy.sink;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.source.shaded.guava.RateLimiter;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.dispatch.DispatchManager;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.utils.Constants;
import org.apache.inlong.dataproxy.utils.NetworkUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corerpc.exception.OverflowException;
import org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.class */
public class SimpleMessageTubeSink extends AbstractSink implements Configurable {
    private static final String TUBE_REQUEST_TIMEOUT = "tube-request-timeout";
    private static final String KEY_DISK_IO_RATE_PER_SEC = "disk-io-rate-per-sec";
    private static final String SINK_THREAD_NUM = "thread-num";
    private static final int defaultLogEveryNEvents = 100000;
    private static final int defaultSendTimeout = 20000;
    private static final int defaultStatIntervalSec = 60;
    private static final int sendNewMetricRetryCount = 3;
    private static final String LOG_TOPIC = "proxy-log-topic";
    private static final String STREAMID = "proxy-log-streamid";
    private static final String GROUPID = "proxy-log-groupid";
    private static final String SEND_REMOTE = "send-remote";
    private static final String topicsFilePath = "topics.properties";
    private static final String slaTopicFilePath = "slaTopics.properties";
    private static final String SLA_METRIC_SINK = "sla-metric-sink";
    private ConfigManager configManager;
    private Map<String, String> topicProperties;
    public MessageProducer producer;
    public Map<String, MessageProducer> producerMap;
    private LinkedBlockingQueue<EventStat> resendQueue;
    private LinkedBlockingQueue<Event> eventQueue;
    private long diskIORatePerSec;
    private RateLimiter diskRateLimiter;
    public TubeMultiSessionFactory sessionFactory;
    private String masterHostAndPortList;
    private Integer logEveryNEvents;
    private Integer sendTimeout;
    private int threadNum;
    private Thread[] sinkThreadPool;
    private long linkMaxAllowedDelayedMsgCount;
    private long sessionWarnDelayedMsgCount;
    private long sessionMaxAllowedDelayedMsgCount;
    private long nettyWriteBufferHighWaterMark;
    private int recoverthreadcount;
    private Map<String, String> dimensions;
    private DataProxyMetricItemSet metricItemSet;
    private IdCacheCleaner idCacheCleaner;
    private static final Logger logger = LoggerFactory.getLogger(SimpleMessageTubeSink.class);
    private static int MAX_TOPICS_EACH_PRODUCER_HOLD = 200;
    private static int BAD_EVENT_QUEUE_SIZE = 10000;
    private static int EVENT_QUEUE_SIZE = 1000;
    private static int BATCH_SIZE = 10000;
    private static String MASTER_HOST_PORT_LIST = "master-host-port-list";
    private static String TOPIC = "topic";
    private static String SEND_TIMEOUT = "send_timeout";
    private static String LOG_EVERY_N_EVENTS = "log-every-n-events";
    private static String RETRY_CNT = "retry-currentSuccSendedCnt";
    private static String STAT_INTERVAL_SEC = ConfigConstants.STAT_INTERVAL_SEC;
    private static String MAX_TOPICS_EACH_PRODUCER_HOLD_NAME = "max-topic-each-producer-hold";
    private static String MAX_SURVIVED_TIME = "max-survived-time";
    private static String MAX_SURVIVED_SIZE = "max-survived-size";
    private static String CLIENT_ID_CACHE = "client-id-cache";
    private static final int defaultRetryCnt = -1;
    private static int retryCnt = defaultRetryCnt;
    private static final LoadingCache<String, Long> agentIdCache = CacheBuilder.newBuilder().concurrencyLevel(32).initialCapacity(5000000).expireAfterAccess(30, TimeUnit.SECONDS).build(new CacheLoader<String, Long>() { // from class: org.apache.inlong.dataproxy.sink.SimpleMessageTubeSink.1
        public Long load(String str) {
            return Long.valueOf(System.currentTimeMillis());
        }
    });
    protected static boolean idCleanerStarted = false;
    protected static final ConcurrentHashMap<String, Long> agentIdMap = new ConcurrentHashMap<>();
    private static ConcurrentHashMap<String, Long> illegalTopicMap = new ConcurrentHashMap<>();
    private volatile boolean canTake = false;
    private volatile boolean canSend = false;
    private int maxSurvivedTime = 90000;
    private int maxSurvivedSize = defaultLogEveryNEvents;
    private String proxyLogTopic = "teg_manager";
    private String proxyLogGroupId = "b_teg_manager";
    private String proxyLogStreamId = "proxy_measure_log";
    private boolean sendRemote = false;
    public AtomicInteger currentPublishTopicNum = new AtomicInteger(0);
    private int requestTimeout = defaultStatIntervalSec;
    private String metaTopicFilePath = topicsFilePath;
    private boolean clientIdCache = false;
    private boolean isNewCache = true;
    private boolean overflow = false;

    /* loaded from: input_file:org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink$MyCallback.class */
    public class MyCallback implements MessageSentCallback {
        private EventStat myEventStat;
        private long sendTime = System.currentTimeMillis();

        public MyCallback(EventStat eventStat) {
            this.myEventStat = eventStat;
        }

        public void onMessageSent(MessageSentResult messageSentResult) {
            if (messageSentResult.isSuccess()) {
                addMetric(this.myEventStat.getEvent(), true, this.sendTime);
                return;
            }
            addMetric(this.myEventStat.getEvent(), false, 0L);
            if (messageSentResult.getErrCode() == 403) {
                SimpleMessageTubeSink.logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}", new Object[]{messageSentResult.getErrMsg(), Integer.valueOf(SimpleMessageTubeSink.this.resendQueue.size()), Integer.valueOf(this.myEventStat.getEvent().hashCode())});
                return;
            }
            if (messageSentResult.getErrCode() != 419) {
                SimpleMessageTubeSink.logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}", new Object[]{messageSentResult.getErrMsg(), Integer.valueOf(SimpleMessageTubeSink.this.resendQueue.size()), Integer.valueOf(this.myEventStat.getEvent().hashCode())});
            }
            SimpleMessageTubeSink.this.resendEvent(this.myEventStat, true);
        }

        private void addMetric(Event event, boolean z, long j) {
            HashMap hashMap = new HashMap();
            hashMap.put("clusterId", SimpleMessageTubeSink.this.getName());
            hashMap.put(DataProxyMetricItem.KEY_SINK_ID, SimpleMessageTubeSink.this.getName());
            hashMap.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().getOrDefault(SimpleMessageTubeSink.TOPIC, ""));
            DataProxyMetricItem.fillInlongId(event, hashMap);
            DataProxyMetricItem.fillAuditFormatTime(event, hashMap);
            DataProxyMetricItem dataProxyMetricItem = (DataProxyMetricItem) SimpleMessageTubeSink.this.metricItemSet.findMetricItem(hashMap);
            if (!z) {
                dataProxyMetricItem.sendFailCount.incrementAndGet();
                dataProxyMetricItem.sendFailSize.addAndGet(event.getBody().length);
                return;
            }
            dataProxyMetricItem.sendSuccessCount.incrementAndGet();
            dataProxyMetricItem.sendSuccessSize.addAndGet(event.getBody().length);
            AuditUtils.add(6, event);
            if (j > 0) {
                long currentTimeMillis = System.currentTimeMillis();
                long j2 = NumberUtils.toLong((String) event.getHeaders().get("msgTime"), j);
                long j3 = currentTimeMillis - j;
                long j4 = currentTimeMillis - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, j2);
                dataProxyMetricItem.sinkDuration.addAndGet(j3);
                dataProxyMetricItem.nodeDuration.addAndGet(j4);
                dataProxyMetricItem.wholeDuration.addAndGet(currentTimeMillis - j2);
            }
        }

        public void onException(Throwable th) {
            Throwable th2;
            Throwable th3 = th;
            while (true) {
                th2 = th3;
                if (th2.getCause() == null) {
                    break;
                } else {
                    th3 = th2.getCause();
                }
            }
            if (th2 instanceof OverflowException) {
                SimpleMessageTubeSink.this.overflow = true;
            }
            SimpleMessageTubeSink.this.resendEvent(this.myEventStat, true);
        }
    }

    /* loaded from: input_file:org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink$SinkTask.class */
    class SinkTask implements Runnable {
        SinkTask() {
        }

        private void sendMessage(Event event, String str, AtomicBoolean atomicBoolean, EventStat eventStat) throws TubeClientException, InterruptedException {
            String str2 = (String) event.getHeaders().get(ConfigConstants.SEQUENCE_ID);
            if (SimpleMessageTubeSink.this.isNewCache) {
                boolean z = false;
                if (SimpleMessageTubeSink.this.clientIdCache && str2 != null) {
                    z = SimpleMessageTubeSink.agentIdCache.asMap().containsKey(str2);
                }
                if (SimpleMessageTubeSink.this.clientIdCache && str2 != null && z) {
                    SimpleMessageTubeSink.agentIdCache.put(str2, Long.valueOf(System.currentTimeMillis()));
                    SimpleMessageTubeSink.logger.info("{} agent package {} existed,just discard.", SimpleMessageTubeSink.this.getName(), str2);
                } else {
                    if (str2 != null) {
                        SimpleMessageTubeSink.agentIdCache.put(str2, Long.valueOf(System.currentTimeMillis()));
                    }
                    SimpleMessageTubeSink.this.producer.sendMessage(parseEvent2Message(str, event), new MyCallback(eventStat));
                    atomicBoolean.set(true);
                }
            } else {
                Long l = 0L;
                if (SimpleMessageTubeSink.this.clientIdCache && str2 != null) {
                    l = SimpleMessageTubeSink.agentIdMap.put(str2, Long.valueOf(System.currentTimeMillis()));
                }
                if (!SimpleMessageTubeSink.this.clientIdCache || str2 == null || l == null || l.longValue() <= 0) {
                    SimpleMessageTubeSink.this.producer.sendMessage(parseEvent2Message(str, event), new MyCallback(eventStat));
                    atomicBoolean.set(true);
                } else {
                    SimpleMessageTubeSink.logger.info("{} agent package {} existed,just discard.", SimpleMessageTubeSink.this.getName(), str2);
                }
            }
            SimpleMessageTubeSink.illegalTopicMap.remove(str);
        }

        private Message parseEvent2Message(String str, Event event) {
            Message message = new Message(str, event.getBody());
            message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
            String str2 = "";
            if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
                str2 = (String) event.getHeaders().get(AttributeConstants.STREAM_ID);
            } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
                str2 = (String) event.getHeaders().get(AttributeConstants.INAME);
            }
            message.putSystemHeader(str2, (String) event.getHeaders().get("msg.pkg.time"));
            Map headers = event.getHeaders();
            message.setAttrKeyVal("inlongGroupId", (String) headers.get("inlongGroupId"));
            message.setAttrKeyVal("inlongStreamId", (String) headers.get("inlongStreamId"));
            message.setAttrKeyVal("topic", (String) headers.get("topic"));
            message.setAttrKeyVal("msgTime", (String) headers.get("msgTime"));
            message.setAttrKeyVal(Constants.HEADER_KEY_SOURCE_IP, (String) headers.get(Constants.HEADER_KEY_SOURCE_IP));
            return message;
        }

        private void handleException(Throwable th, String str, boolean z, EventStat eventStat) {
            if (th instanceof TubeClientException) {
                String message = th.getMessage();
                if (message != null && (message.contains("No available queue for topic") || message.contains("The brokers of topic are all forbidden"))) {
                    SimpleMessageTubeSink.illegalTopicMap.put(str, Long.valueOf(System.currentTimeMillis() + DispatchManager.MINUTE_MS));
                    SimpleMessageTubeSink.logger.info("IllegalTopicMap.put " + str);
                    return;
                }
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
            }
            SimpleMessageTubeSink.logger.error("Sink task fail to send the message, decrementFlag=" + z + ",sink.name=" + Thread.currentThread().getName() + ",event.headers=" + eventStat.getEvent().getHeaders(), th);
        }

        @Override // java.lang.Runnable
        public void run() {
            SimpleMessageTubeSink.logger.info("Sink task {} started.", Thread.currentThread().getName());
            while (SimpleMessageTubeSink.this.canSend) {
                Event event = null;
                EventStat eventStat = null;
                String str = null;
                try {
                    if (SimpleMessageTubeSink.this.overflow) {
                        SimpleMessageTubeSink.this.overflow = false;
                        Thread.sleep(10L);
                    }
                    if (SimpleMessageTubeSink.this.resendQueue.isEmpty()) {
                        event = (Event) SimpleMessageTubeSink.this.eventQueue.take();
                        eventStat = new EventStat(event);
                        if (event.getHeaders().containsKey(SimpleMessageTubeSink.TOPIC)) {
                            str = (String) event.getHeaders().get(SimpleMessageTubeSink.TOPIC);
                        }
                    } else {
                        eventStat = (EventStat) SimpleMessageTubeSink.this.resendQueue.poll();
                        if (eventStat != null) {
                            event = eventStat.getEvent();
                            if (event.getHeaders().containsKey(SimpleMessageTubeSink.TOPIC)) {
                                str = (String) event.getHeaders().get(SimpleMessageTubeSink.TOPIC);
                            }
                        }
                    }
                    if (event != null) {
                        if (str == null || str.equals("")) {
                            SimpleMessageTubeSink.logger.warn("no topic specified in event header, just skip this event");
                        } else {
                            Long l = (Long) SimpleMessageTubeSink.illegalTopicMap.get(str);
                            if (l != null) {
                                if (l.longValue() <= System.currentTimeMillis()) {
                                    SimpleMessageTubeSink.illegalTopicMap.remove(str);
                                }
                            }
                            MessageProducer messageProducer = null;
                            try {
                                messageProducer = SimpleMessageTubeSink.this.getProducer(str);
                            } catch (Exception e) {
                                SimpleMessageTubeSink.logger.error("Get producer failed!", e);
                            }
                            if (messageProducer == null) {
                                SimpleMessageTubeSink.illegalTopicMap.put(str, Long.valueOf(System.currentTimeMillis() + 30000));
                            } else {
                                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                                sendMessage(event, str, atomicBoolean, eventStat);
                                atomicBoolean.get();
                            }
                        }
                    }
                } catch (InterruptedException e2) {
                    SimpleMessageTubeSink.logger.info("Thread {} has been interrupted!", Thread.currentThread().getName());
                    return;
                } catch (Throwable th) {
                    handleException(th, str, false, eventStat);
                    SimpleMessageTubeSink.this.resendEvent(eventStat, false);
                }
            }
        }
    }

    public void diffSetPublish(Set<String> set, Set<String> set2) {
        boolean z = false;
        for (String str : set2) {
            if (!set.contains(str)) {
                z = true;
                try {
                    this.producer = getProducer(str);
                } catch (Exception e) {
                    logger.error("Get producer failed!", e);
                }
            }
        }
        if (z) {
            logger.info("topics.properties has changed, trigger diff publish for {}", getName());
            this.topicProperties = this.configManager.getTopicProperties();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageProducer getProducer(String str) throws TubeClientException {
        if (this.producerMap.containsKey(str)) {
            return this.producerMap.get(str);
        }
        synchronized (this) {
            if (!this.producerMap.containsKey(str)) {
                if (this.producer == null || this.currentPublishTopicNum.get() >= MAX_TOPICS_EACH_PRODUCER_HOLD) {
                    this.producer = this.sessionFactory.createProducer();
                    this.currentPublishTopicNum.set(0);
                }
                this.producer.publish(str);
                this.producerMap.put(str, this.producer);
                this.currentPublishTopicNum.incrementAndGet();
            }
        }
        return this.producerMap.get(str);
    }

    private TubeClientConfig initTubeConfig() throws Exception {
        TubeClientConfig tubeClientConfig = new TubeClientConfig(NetworkUtils.getLocalIp(), this.masterHostAndPortList);
        tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(this.linkMaxAllowedDelayedMsgCount);
        tubeClientConfig.setSessionWarnDelayedMsgCount(this.sessionWarnDelayedMsgCount);
        tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(this.sessionMaxAllowedDelayedMsgCount);
        tubeClientConfig.setNettyWriteBufferHighWaterMark(this.nettyWriteBufferHighWaterMark);
        tubeClientConfig.setHeartbeatPeriodMs(15000L);
        tubeClientConfig.setRpcTimeoutMs(20000L);
        return tubeClientConfig;
    }

    private void createConnection() throws FlumeException {
        if (this.sessionFactory != null) {
            return;
        }
        try {
            this.sessionFactory = new TubeMultiSessionFactory(initTubeConfig());
            if (this.producerMap == null) {
                this.producerMap = new HashMap();
            }
            logger.debug("building tube producer");
        } catch (TubeClientException e) {
            logger.error("create connnection error in metasink, maybe tube master set error, please re-check. ex1 {}", e.getMessage());
            throw new FlumeException("connect to Tube error1, maybe zkstr/zkroot set error, please re-check");
        } catch (Throwable th) {
            logger.error("create connnection error in metasink, maybe tube master set error/shutdown in progress, please re-check. ex2 {}", th.getMessage());
            throw new FlumeException("connect to meta error2, maybe tube master set error/shutdown in progress, please re-check");
        }
    }

    private void destroyConnection() {
        Iterator<Map.Entry<String, MessageProducer>> it = this.producerMap.entrySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().getValue().shutdown();
            } catch (TubeClientException e) {
                logger.error("destroy producer error in metasink, MetaClientException {}", e.getMessage());
            } catch (Throwable th) {
                logger.error("destroy producer error in metasink, ex {}", th.getMessage());
            }
        }
        this.producerMap.clear();
        if (this.sessionFactory != null) {
            try {
                this.sessionFactory.shutdown();
            } catch (Exception e2) {
                logger.error("destroy sessionFactory error in metasink, ex {}", e2.getMessage());
            } catch (TubeClientException e3) {
                logger.error("destroy sessionFactory error in metasink, MetaClientException {}", e3.getMessage());
            }
        }
        this.sessionFactory = null;
        logger.debug("closed meta producer");
    }

    /* JADX WARN: Can't wrap try/catch for region: R(13:4|(2:6|(2:9|10)(1:8))|11|(2:14|12)|15|16|17|18|(4:20|(2:23|21)|24|25)|26|27|10|2) */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x0108, code lost:
    
        r18 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:30:0x010a, code lost:
    
        org.apache.inlong.dataproxy.sink.SimpleMessageTubeSink.logger.info(getName() + " meta sink initTopicSet fail.", r18);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void initTopicSet(java.util.Set<java.lang.String> r8) throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 391
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.inlong.dataproxy.sink.SimpleMessageTubeSink.initTopicSet(java.util.Set):void");
    }

    public void start() {
        this.dimensions = new HashMap();
        this.dimensions.put("clusterId", "DataProxy");
        this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, getName());
        this.metricItemSet = new DataProxyMetricItemSet(getName());
        MetricRegister.register(this.metricItemSet);
        try {
            createConnection();
            if (this.clientIdCache && !this.isNewCache) {
                this.idCacheCleaner = new IdCacheCleaner(this, this.maxSurvivedTime, this.maxSurvivedSize);
                this.idCacheCleaner.start();
            }
            super.start();
            this.canSend = true;
            this.canTake = true;
            try {
                initTopicSet(new HashSet(this.topicProperties.values()));
            } catch (Exception e) {
                logger.info("meta sink start publish topic fail.", e);
            }
            for (int i = 0; i < this.sinkThreadPool.length; i++) {
                this.sinkThreadPool[i] = new Thread(new SinkTask(), getName() + "_tube_sink_sender-" + i);
                this.sinkThreadPool[i].start();
            }
        } catch (FlumeException e2) {
            logger.error("Unable to create tube client. Exception follows.", e2);
            destroyConnection();
            stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resendEvent(EventStat eventStat, boolean z) {
        if (eventStat != null) {
            try {
                if (eventStat.getEvent() == null) {
                    return;
                }
                if (this.clientIdCache) {
                    String str = (String) eventStat.getEvent().getHeaders().get(ConfigConstants.SEQUENCE_ID);
                    if (this.isNewCache) {
                        if (str != null && agentIdCache.asMap().containsKey(str)) {
                            agentIdCache.invalidate(str);
                        }
                    } else if (str != null && agentIdMap.containsKey(str)) {
                        agentIdMap.remove(str);
                    }
                }
            } catch (Throwable th) {
                logger.error(getName() + " Discard msg because put events to both of queue and fileChannel fail,current resendQueue.size = " + this.resendQueue.size(), th);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Sink.Status process() throws EventDeliveryException {
        if (!this.canTake) {
            return Sink.Status.BACKOFF;
        }
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            try {
                Event take = channel.take();
                if (take != null) {
                    if (this.diskRateLimiter != null) {
                        this.diskRateLimiter.acquire(take.getBody().length);
                    }
                    if (this.eventQueue.offer(take, 3000L, TimeUnit.MILLISECONDS)) {
                        transaction.commit();
                        if (take.getHeaders().containsKey(TOPIC)) {
                            this.dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, take.getHeaders().get(TOPIC));
                        } else {
                            this.dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
                        }
                        DataProxyMetricItem dataProxyMetricItem = (DataProxyMetricItem) this.metricItemSet.findMetricItem(this.dimensions);
                        dataProxyMetricItem.readFailCount.incrementAndGet();
                        dataProxyMetricItem.readFailSize.addAndGet(take.getBody().length);
                    } else {
                        logger.info("[{}] Channel --> Queue(has no enough space,current code point) --> Tube,Check if Tube server or network is ok.(if this situation last long time it will cause memoryChannel full and fileChannel write.)", getName());
                        transaction.rollback();
                    }
                } else {
                    status = Sink.Status.BACKOFF;
                    transaction.commit();
                }
                transaction.close();
            } catch (Throwable th) {
                logger.error("Process event failed!" + getName(), th);
                try {
                    transaction.rollback();
                } catch (Throwable th2) {
                    logger.error("metasink transaction rollback exception", th2);
                }
                transaction.close();
            }
            return status;
        } catch (Throwable th3) {
            transaction.close();
            throw th3;
        }
    }

    public void configure(Context context) {
        logger.info(context.toString());
        this.configManager = ConfigManager.getInstance();
        this.topicProperties = this.configManager.getTopicProperties();
        this.configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() { // from class: org.apache.inlong.dataproxy.sink.SimpleMessageTubeSink.2
            @Override // org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback
            public void update() {
                SimpleMessageTubeSink.this.diffSetPublish(new HashSet(SimpleMessageTubeSink.this.topicProperties.values()), new HashSet(SimpleMessageTubeSink.this.configManager.getTopicProperties().values()));
            }
        });
        this.masterHostAndPortList = context.getString(MASTER_HOST_PORT_LIST);
        Preconditions.checkState(this.masterHostAndPortList != null, "No master and port list specified");
        this.producerMap = new HashMap();
        this.logEveryNEvents = context.getInteger(LOG_EVERY_N_EVENTS, Integer.valueOf(defaultLogEveryNEvents));
        logger.debug(getName() + " " + LOG_EVERY_N_EVENTS + " " + this.logEveryNEvents);
        Preconditions.checkArgument(this.logEveryNEvents.intValue() > 0, "logEveryNEvents must be > 0");
        this.sendTimeout = context.getInteger(SEND_TIMEOUT, Integer.valueOf(defaultSendTimeout));
        logger.debug(getName() + " " + SEND_TIMEOUT + " " + this.sendTimeout);
        Preconditions.checkArgument(this.sendTimeout.intValue() > 0, "sendTimeout must be > 0");
        MAX_TOPICS_EACH_PRODUCER_HOLD = context.getInteger(MAX_TOPICS_EACH_PRODUCER_HOLD_NAME, 200).intValue();
        retryCnt = context.getInteger(RETRY_CNT, Integer.valueOf(defaultRetryCnt)).intValue();
        logger.debug(getName() + " " + RETRY_CNT + " " + retryCnt);
        if (context.getBoolean(SLA_METRIC_SINK, false).booleanValue()) {
            this.metaTopicFilePath = slaTopicFilePath;
        }
        this.clientIdCache = context.getBoolean(CLIENT_ID_CACHE, Boolean.valueOf(this.clientIdCache)).booleanValue();
        if (this.clientIdCache) {
            int intValue = context.getInteger(MAX_SURVIVED_TIME, Integer.valueOf(this.maxSurvivedTime)).intValue();
            if (intValue > 0) {
                this.maxSurvivedTime = intValue;
            } else {
                logger.warn("invalid {}:{}", MAX_SURVIVED_TIME, Integer.valueOf(intValue));
            }
            int intValue2 = context.getInteger(MAX_SURVIVED_SIZE, Integer.valueOf(this.maxSurvivedSize)).intValue();
            if (intValue2 > 0) {
                this.maxSurvivedSize = intValue2;
            } else {
                logger.warn("invalid {}:{}", MAX_SURVIVED_SIZE, Integer.valueOf(intValue2));
            }
        }
        String string = context.getString(TUBE_REQUEST_TIMEOUT);
        if (string != null) {
            this.requestTimeout = Integer.parseInt(string);
        }
        String string2 = context.getString(SEND_REMOTE);
        if (string2 != null) {
            this.sendRemote = Boolean.parseBoolean(string2);
        }
        if (this.sendRemote) {
            this.proxyLogTopic = context.getString(LOG_TOPIC, this.proxyLogTopic);
            this.proxyLogGroupId = context.getString(GROUPID, this.proxyLogStreamId);
            this.proxyLogStreamId = context.getString(STREAMID, this.proxyLogStreamId);
        }
        this.resendQueue = new LinkedBlockingQueue<>(BAD_EVENT_QUEUE_SIZE);
        this.threadNum = Integer.parseInt(context.getString(SINK_THREAD_NUM, "4"));
        Preconditions.checkArgument(this.threadNum > 0, "threadNum must be > 0");
        this.sinkThreadPool = new Thread[this.threadNum];
        this.eventQueue = new LinkedBlockingQueue<>(EVENT_QUEUE_SIZE);
        this.diskIORatePerSec = context.getLong(KEY_DISK_IO_RATE_PER_SEC, 0L).longValue();
        if (this.diskIORatePerSec != 0) {
            this.diskRateLimiter = RateLimiter.create(this.diskIORatePerSec);
        }
        this.linkMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.LINK_MAX_ALLOWED_DELAYED_MSG_COUNT, 80000L).longValue();
        this.sessionWarnDelayedMsgCount = context.getLong(ConfigConstants.SESSION_WARN_DELAYED_MSG_COUNT, 2000000L).longValue();
        this.sessionMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT, 4000000L).longValue();
        this.nettyWriteBufferHighWaterMark = context.getLong(ConfigConstants.NETTY_WRITE_BUFFER_HIGH_WATER_MARK, 15728640L).longValue();
        this.recoverthreadcount = context.getInteger(ConfigConstants.RECOVER_THREAD_COUNT, Integer.valueOf(Runtime.getRuntime().availableProcessors() + 1)).intValue();
    }

    public DataProxyMetricItemSet getMetricItemSet() {
        return this.metricItemSet;
    }
}
