/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.tubemq.client.consumer;

import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.client.common.PeerInfo;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.consumer.BaseMessageConsumer;
import org.apache.inlong.tubemq.client.consumer.ConsumeOffsetInfo;
import org.apache.inlong.tubemq.client.consumer.FetchContext;
import org.apache.inlong.tubemq.client.consumer.MessageFetchManager;
import org.apache.inlong.tubemq.client.consumer.MessageListener;
import org.apache.inlong.tubemq.client.consumer.PartitionSelectResult;
import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
import org.apache.inlong.tubemq.client.consumer.TopicProcessor;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.InnerSessionFactory;
import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimplePushMessageConsumer
implements PushMessageConsumer {
    private static final Logger logger = LoggerFactory.getLogger(SimplePushMessageConsumer.class);
    private static final int MAX_FAILURE_LOG_TIMES = 10;
    private final MessageFetchManager fetchManager;
    private final BaseMessageConsumer baseConsumer;
    private AtomicLong lastLogPrintTime = new AtomicLong(0L);
    private AtomicLong lastFailureCount = new AtomicLong(0L);
    private CountDownLatch consumeSync = new CountDownLatch(0);

    public SimplePushMessageConsumer(InnerSessionFactory messageSessionFactory, ConsumerConfig consumerConfig) throws TubeClientException {
        this.baseConsumer = new BaseMessageConsumer(messageSessionFactory, consumerConfig, false);
        this.fetchManager = new MessageFetchManager(this.baseConsumer.consumerConfig, this);
        this.fetchManager.startFetchWorkers();
    }

    public void shutdown() throws Throwable {
        this.pauseConsume();
        this.fetchManager.stopFetchWorkers(true);
        ThreadUtils.sleep((long)200L);
        this.fetchManager.stopFetchWorkers(false);
        this.baseConsumer.shutdown();
    }

    @Override
    public PushMessageConsumer subscribe(String topic, TreeSet<String> filterConds, MessageListener messageListener) throws TubeClientException {
        this.baseConsumer.subscribe(topic, filterConds, messageListener);
        return this;
    }

    @Override
    public void completeSubscribe() throws TubeClientException {
        this.baseConsumer.completeSubscribe();
    }

    @Override
    public void completeSubscribe(String sessionKey, int sourceCount, boolean isSelectBig, Map<String, Long> partOffsetMap) throws TubeClientException {
        this.baseConsumer.completeSubscribe(sessionKey, sourceCount, isSelectBig, partOffsetMap);
    }

    @Override
    public String getClientVersion() {
        return this.baseConsumer.getClientVersion();
    }

    @Override
    public String getConsumerId() {
        return this.baseConsumer.getConsumerId();
    }

    @Override
    public boolean isShutdown() {
        return this.baseConsumer.isShutdown();
    }

    @Override
    public ConsumerConfig getConsumerConfig() {
        return this.baseConsumer.getConsumerConfig();
    }

    @Override
    public boolean isFilterConsume(String topic) {
        return this.baseConsumer.isFilterConsume(topic);
    }

    @Override
    public Map<String, ConsumeOffsetInfo> getCurConsumedPartitions() throws TubeClientException {
        return this.baseConsumer.getCurConsumedPartitions();
    }

    @Override
    public void freezePartitions(List<String> partitionKeys) throws TubeClientException {
        this.baseConsumer.freezePartitions(partitionKeys);
    }

    @Override
    public void unfreezePartitions(List<String> partitionKeys) throws TubeClientException {
        this.baseConsumer.unfreezePartitions(partitionKeys);
    }

    @Override
    public void relAllFrozenPartitions() {
        this.baseConsumer.relAllFrozenPartitions();
    }

    @Override
    public Map<String, Long> getFrozenPartInfo() {
        return this.baseConsumer.getFrozenPartInfo();
    }

    protected BaseMessageConsumer getBaseConsumer() {
        return this.baseConsumer;
    }

    protected void allowConsumeWait() {
        if (this.consumeSync != null && this.consumeSync.getCount() != 0L) {
            try {
                this.consumeSync.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    @Override
    public void resumeConsume() {
        this.consumeSync.countDown();
        logger.info(new StringBuilder(256).append("[ResumeConsume] Consume is resume, consumerId :").append(this.baseConsumer.consumerId).toString());
    }

    @Override
    public void pauseConsume() {
        this.consumeSync = new CountDownLatch(1);
        logger.info(new StringBuilder(256).append("[PauseConsume] Consume is paused, consumerId :").append(this.baseConsumer.consumerId).toString());
    }

    @Override
    public boolean isConsumePaused() {
        return this.consumeSync != null && this.consumeSync.getCount() != 0L;
    }

    protected void processRequest(PartitionSelectResult partSelectResult, StringBuilder sBuilder) {
        long startTime = System.currentTimeMillis();
        FetchContext taskContext = this.baseConsumer.fetchMessage(partSelectResult, sBuilder);
        if (!taskContext.isSuccess()) {
            if (logger.isDebugEnabled()) {
                logger.debug(sBuilder.append("Fetch message error: partition:").append(partSelectResult.getPartition().toString()).append(" error is ").append(taskContext.getErrMsg()).toString());
                sBuilder.delete(0, sBuilder.length());
            }
            return;
        }
        boolean isConsumed = false;
        if (!this.isShutdown()) {
            if (taskContext.getMessageList() == null || taskContext.getMessageList().isEmpty()) {
                isConsumed = true;
            } else {
                try {
                    TopicProcessor topicProcessor = this.baseConsumer.consumeSubInfo.getTopicProcessor(taskContext.getPartition().getTopic());
                    if (topicProcessor == null || topicProcessor.getMessageListener() == null) {
                        throw new TubeClientException(sBuilder.append("Listener is null for topic ").append(taskContext.getPartition().getTopic()).toString());
                    }
                    isConsumed = this.notifyListener(taskContext, topicProcessor, sBuilder);
                }
                catch (Throwable e) {
                    isConsumed = !this.baseConsumer.consumerConfig.isPushListenerThrowedRollBack();
                    this.logMessageProcessFailed(taskContext, e);
                }
            }
        }
        this.baseConsumer.rmtDataCache.succRspRelease(taskContext.getPartition().getPartitionKey(), taskContext.getPartition().getTopic(), taskContext.getUsedToken(), isConsumed, this.isFilterConsume(taskContext.getPartition().getTopic()), taskContext.getCurrOffset(), taskContext.getMaxOffset());
        long cost = System.currentTimeMillis() - startTime;
        if (cost > 30000L) {
            logger.info(sBuilder.append("Consuming Partition; current processing thread ").append(Thread.currentThread().getName()).append("-->Process[").append(partSelectResult.getPartition().toString()).append("] cost:").append(cost).append(" Ms").toString());
            sBuilder.delete(0, sBuilder.length());
        }
    }

    private boolean notifyListener(final FetchContext request, final TopicProcessor topicProcessor, StringBuilder sBuilder) throws Exception {
        MessageListener listener = topicProcessor.getMessageListener();
        if (listener.getExecutor() != null) {
            try {
                listener.getExecutor().execute(new Runnable(){

                    @Override
                    public void run() {
                        SimplePushMessageConsumer.this.receiveMessages(request, topicProcessor);
                    }
                });
            }
            catch (RejectedExecutionException e) {
                logger.error(new StringBuilder(512).append("MessageListener thread poll is busy, topic=").append(request.getPartition().getTopic()).append(",partition=").append(request.getPartition()).toString(), (Throwable)e);
                throw e;
            }
        } else {
            this.receiveMessages(request, topicProcessor);
        }
        return true;
    }

    private void receiveMessages(FetchContext request, TopicProcessor topicProcessor) {
        if (request != null && request.getMessageList() != null) {
            MessageListener msgListener = topicProcessor.getMessageListener();
            try {
                msgListener.receiveMessages(new PeerInfo(request.getPartition(), request.getCurrOffset(), request.getMaxOffset()), request.getMessageList());
            }
            catch (InterruptedException e) {
                logger.info("Call listener to process received messages throw Interrupted Exception!");
            }
        }
    }

    private void logMessageProcessFailed(FetchContext request, Throwable e) {
        StringBuilder sBuilder = new StringBuilder(512);
        sBuilder.append("CallBack process message failed: partition=").append(request.getPartition());
        sBuilder.append(", group=").append(this.baseConsumer.consumerConfig.getConsumerGroup());
        sBuilder.append(", FetchManager.isConsumePaused=").append(this.isConsumePaused());
        sBuilder.append(", MessageConsumer.shutdown=").append(this.isShutdown());
        if (!this.isShutdown()) {
            long now = System.currentTimeMillis();
            long lastTime = this.lastLogPrintTime.get();
            if (this.lastFailureCount.incrementAndGet() <= 10L || lastTime <= 0L || now - lastTime > 30000L) {
                logger.warn(sBuilder.toString(), e);
                if (now - lastTime > 30000L) {
                    this.lastLogPrintTime.set(now);
                    this.lastFailureCount.set(0L);
                }
            }
        }
    }
}

