package org.apache.kylin.stream.core.consumer;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.stream.core.exception.StreamingException;
import org.apache.kylin.stream.core.metrics.StreamingMetrics;
import org.apache.kylin.stream.core.model.StreamingMessage;
import org.apache.kylin.stream.core.model.stats.ConsumerStats;
import org.apache.kylin.stream.core.model.stats.PartitionConsumeStats;
import org.apache.kylin.stream.core.source.MessageFormatException;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.stream.core.storage.StreamingSegmentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.class */
public class StreamingConsumerChannel implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(StreamingConsumerChannel.class);
    protected volatile boolean stopped;
    protected volatile boolean paused;
    protected volatile boolean hasStoppedConsuming;
    private volatile CountDownLatch pauseLatch;
    private Thread consumerThread;
    private String cubeName;
    private IStreamingConnector connector;
    private StreamingSegmentManager cubeSegmentManager;
    private volatile IStopConsumptionCondition stopCondition;
    private AtomicLong parseEventErrorCnt = new AtomicLong(0);
    private AtomicLong addEventErrorCnt = new AtomicLong(0);
    private AtomicLong incomingEventCnt = new AtomicLong(0);
    private AtomicLong dropEventCnt = new AtomicLong(0);
    private volatile CountDownLatch stopLatch = new CountDownLatch(1);
    private Map<Integer, Meter> eventConsumeMeters = Maps.newHashMap();
    private volatile long minAcceptEventTime = 0;

    public StreamingConsumerChannel(String str, IStreamingConnector iStreamingConnector, StreamingSegmentManager streamingSegmentManager, IStopConsumptionCondition iStopConsumptionCondition) {
        this.connector = iStreamingConnector;
        this.cubeName = str;
        this.cubeSegmentManager = streamingSegmentManager;
        this.stopCondition = iStopConsumptionCondition;
    }

    public void setStopCondition(IStopConsumptionCondition iStopConsumptionCondition) {
        this.stopCondition = iStopConsumptionCondition;
        this.stopCondition.init(this.connector.getConsumePartitions());
    }

    public void setMinAcceptEventTime(long j) {
        this.minAcceptEventTime = j;
    }

    public void start() {
        this.consumerThread = new Thread(this, this.cubeName + "_channel");
        this.consumerThread.setPriority(10);
        this.connector.open();
        this.consumerThread.start();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.stopped) {
            if (this.paused) {
                try {
                    if (this.pauseLatch != null) {
                        this.pauseLatch.countDown();
                    }
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    logger.warn("interrupted!");
                    this.stopped = true;
                }
            } else {
                StreamingMessage streamingMessage = null;
                try {
                    streamingMessage = this.connector.nextEvent();
                    if (streamingMessage == null) {
                        Thread.sleep(100L);
                    } else {
                        this.incomingEventCnt.incrementAndGet();
                        recordConsumeMetric(Integer.valueOf(streamingMessage.getSourcePosition().getPartition()), streamingMessage.getParams());
                        if (this.stopCondition.isSatisfied(streamingMessage)) {
                            logger.warn("The latest event trigger stopCondition, event = " + streamingMessage);
                            this.stopped = true;
                        } else if (!isFilter(streamingMessage)) {
                            this.cubeSegmentManager.addEvent(streamingMessage);
                        }
                    }
                } catch (InterruptedException e2) {
                    logger.warn("interrupted!");
                    this.stopped = true;
                } catch (MessageFormatException e3) {
                    if (this.parseEventErrorCnt.incrementAndGet() % 1000 < 3) {
                        logger.error(e3.getMessage(), e3);
                    }
                } catch (Exception e4) {
                    if (this.addEventErrorCnt.incrementAndGet() % 1000 < 3) {
                        logger.error("error happens when save event:" + streamingMessage, e4);
                    }
                }
            }
        }
        this.hasStoppedConsuming = true;
        logger.warn("Exit from main event loop, start to close cubeSegmentManager.");
        try {
            this.cubeSegmentManager.close();
            removeMetrics();
            this.connector.close();
            this.stopLatch.countDown();
        } catch (Throwable th) {
            this.connector.close();
            this.stopLatch.countDown();
            throw th;
        }
    }

    private void removeMetrics() {
        Iterator<Map.Entry<Integer, Meter>> it = this.eventConsumeMeters.entrySet().iterator();
        while (it.hasNext()) {
            StreamingMetrics.getInstance().getMetricRegistry().remove(MetricRegistry.name(StreamingMetrics.CONSUME_RATE_PFX, new String[]{this.cubeName, String.valueOf(it.next().getKey())}));
        }
    }

    protected void recordConsumeMetric(Integer num, Map<String, Object> map) {
        Meter meter = this.eventConsumeMeters.get(num);
        if (meter == null) {
            meter = StreamingMetrics.newMeter(MetricRegistry.name(StreamingMetrics.CONSUME_RATE_PFX, new String[]{this.cubeName, String.valueOf(num)}));
            this.eventConsumeMeters.put(num, meter);
        }
        meter.mark();
    }

    private boolean isFilter(StreamingMessage streamingMessage) {
        if (this.minAcceptEventTime == 0 || streamingMessage.getTimestamp() >= this.minAcceptEventTime) {
            return streamingMessage.isFiltered();
        }
        if (this.dropEventCnt.incrementAndGet() % 1000 > 1) {
            return true;
        }
        logger.warn("event dropped, event time {}, min event accept time {}", Long.valueOf(streamingMessage.getTimestamp()), Long.valueOf(this.minAcceptEventTime));
        return true;
    }

    public void stop(long j) {
        this.stopped = true;
        waitConsumerStop(j);
    }

    private void waitConsumerStop(long j) {
        try {
            if (this.stopLatch.await(j, TimeUnit.MILLISECONDS)) {
                return;
            }
            if (!this.hasStoppedConsuming) {
                logger.warn("Consumer not stopped normally, close it forcedly, but the thread may not be stopped correctly");
                this.connector.wakeup();
            }
            if (this.stopLatch.await(j, TimeUnit.MILLISECONDS)) {
                return;
            }
            if (this.hasStoppedConsuming) {
                logger.warn("Consumer has been stopped, but cube data store is not closed");
            } else {
                logger.warn("Consumer is still not stopped");
            }
        } catch (InterruptedException e) {
            logger.warn("Interrupted!", e);
            Thread.interrupted();
        } catch (Exception e2) {
            logger.error("Exception throws when wait consumer stopped", e2);
        }
    }

    public void pause(boolean z) {
        this.paused = true;
        if (z) {
            this.pauseLatch = new CountDownLatch(1);
            try {
                this.pauseLatch.await(10000L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.warn("Interrupted!", e);
                Thread.interrupted();
            }
        }
    }

    public void resumeToStopCondition(IStopConsumptionCondition iStopConsumptionCondition) {
        this.paused = false;
        if (iStopConsumptionCondition != IStopConsumptionCondition.NEVER_STOP) {
            setStopCondition(iStopConsumptionCondition);
            try {
                if (this.stopLatch.await(120000L, TimeUnit.MILLISECONDS)) {
                } else {
                    throw new StreamingException("consumer stop failed for stopCondition:" + iStopConsumptionCondition);
                }
            } catch (InterruptedException e) {
                logger.warn("Interrupted!", e);
            }
        }
    }

    public void resume() {
        this.paused = false;
    }

    public boolean isStopped() {
        return this.stopped;
    }

    public boolean isPaused() {
        return this.paused;
    }

    public String getSourceConsumeInfo() {
        return this.cubeSegmentManager.getConsumePositionStr();
    }

    public ConsumerStats getConsumerStats() {
        Map<Integer, Long> calConsumeLag = this.connector.getSource().calConsumeLag(this.cubeName, this.cubeSegmentManager.getConsumePosition());
        long j = 0;
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<Integer, Meter> entry : this.eventConsumeMeters.entrySet()) {
            Meter value = entry.getValue();
            PartitionConsumeStats partitionConsumeStats = new PartitionConsumeStats();
            partitionConsumeStats.setAvgRate(value.getMeanRate());
            partitionConsumeStats.setOneMinRate(value.getOneMinuteRate());
            partitionConsumeStats.setFiveMinRate(value.getFiveMinuteRate());
            partitionConsumeStats.setFifteenMinRate(value.getFifteenMinuteRate());
            partitionConsumeStats.setTotalConsume(value.getCount());
            long longValue = calConsumeLag.getOrDefault(entry.getKey(), 0L).longValue();
            j += longValue;
            partitionConsumeStats.setConsumeLag(longValue);
            newHashMap.put(entry.getKey(), partitionConsumeStats);
        }
        for (Partition partition : getConsumePartitions()) {
            if (!newHashMap.containsKey(Integer.valueOf(partition.getPartitionId()))) {
                newHashMap.put(Integer.valueOf(partition.getPartitionId()), new PartitionConsumeStats());
            }
        }
        ConsumerStats consumerStats = new ConsumerStats();
        consumerStats.setStopped(this.stopped);
        consumerStats.setPaused(this.paused);
        consumerStats.setTotalIncomingEvents(this.incomingEventCnt.get());
        consumerStats.setTotalExceptionEvents(this.parseEventErrorCnt.get() + this.addEventErrorCnt.get());
        consumerStats.setPartitionConsumeStatsMap(newHashMap);
        consumerStats.setConsumeOffsetInfo(getSourceConsumeInfo());
        consumerStats.setConsumeLag(j);
        return consumerStats;
    }

    public IStreamingConnector getConnector() {
        return this.connector;
    }

    public List<Partition> getConsumePartitions() {
        return getConnector().getConsumePartitions();
    }
}
