package co.cask.cdap.logging.pipeline.kafka;

import ch.qos.logback.classic.spi.ILoggingEvent;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.common.logging.LogSampler;
import co.cask.cdap.common.logging.LogSamplers;
import co.cask.cdap.common.logging.Loggers;
import co.cask.cdap.logging.meta.Checkpoint;
import co.cask.cdap.logging.meta.CheckpointManager;
import co.cask.cdap.logging.pipeline.LogProcessorPipelineContext;
import co.cask.cdap.logging.pipeline.TimeEventQueue;
import co.cask.cdap.logging.serialize.LoggingEventSerializer;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import it.unimi.dsi.fastutil.ints.Int2LongMap;
import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.twill.common.Threads;
import org.apache.twill.kafka.client.BrokerInfo;
import org.apache.twill.kafka.client.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/pipeline/kafka/KafkaLogProcessorPipeline.class */
public final class KafkaLogProcessorPipeline extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaLogProcessorPipeline.class);
    private static final Logger OUTAGE_LOG = Loggers.sampling(LOG, LogSamplers.perMessage(new Supplier<LogSampler>() { // from class: co.cask.cdap.logging.pipeline.kafka.KafkaLogProcessorPipeline.1
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public LogSampler m40get() {
            return LogSamplers.limitRate(60000L);
        }
    }));
    private static final int KAFKA_SO_TIMEOUT = 3000;
    private static final double MIN_FREE_FACTOR = 0.5d;
    private final String name;
    private final LogProcessorPipelineContext context;
    private final CheckpointManager checkpointManager;
    private final BrokerService brokerService;
    private final KafkaPipelineConfig config;
    private final TimeEventQueue<ILoggingEvent, OffsetTime> eventQueue;
    private final MetricsContext metricsContext;
    private final KafkaOffsetResolver offsetResolver;
    private ExecutorService fetchExecutor;
    private volatile Thread runThread;
    private volatile boolean stopped;
    private long lastCheckpointTime;
    private int unSyncedEvents;
    private final Int2LongMap offsets = new Int2LongOpenHashMap();
    private final Int2ObjectMap<MutableCheckpoint> checkpoints = new Int2ObjectOpenHashMap();
    private final LoggingEventSerializer serializer = new LoggingEventSerializer();
    private final Map<BrokerInfo, KafkaSimpleConsumer> kafkaConsumers = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/logging/pipeline/kafka/KafkaLogProcessorPipeline$KafkaSimpleConsumer.class */
    public static final class KafkaSimpleConsumer extends SimpleConsumer {
        private final BrokerInfo brokerInfo;

        KafkaSimpleConsumer(BrokerInfo brokerInfo, int i, int i2, String str) {
            super(brokerInfo.getHost(), brokerInfo.getPort(), i, i2, str);
            this.brokerInfo = brokerInfo;
        }

        BrokerInfo getBrokerInfo() {
            return this.brokerInfo;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/logging/pipeline/kafka/KafkaLogProcessorPipeline$MutableCheckpoint.class */
    public static final class MutableCheckpoint extends Checkpoint {
        private long nextOffset;
        private long nextEventTime;
        private long maxEventTime;

        MutableCheckpoint(Checkpoint checkpoint) {
            this(checkpoint.getNextOffset(), checkpoint.getNextEventTime(), checkpoint.getMaxEventTime());
        }

        MutableCheckpoint(long j, long j2, long j3) {
            super(j, j2, j3);
            this.nextOffset = j;
            this.nextEventTime = j2;
            this.maxEventTime = j3;
        }

        @Override // co.cask.cdap.logging.meta.Checkpoint
        public long getNextOffset() {
            return this.nextOffset;
        }

        MutableCheckpoint setNextOffset(long j) {
            this.nextOffset = j;
            return this;
        }

        @Override // co.cask.cdap.logging.meta.Checkpoint
        public long getNextEventTime() {
            return this.nextEventTime;
        }

        MutableCheckpoint setNextEvenTime(long j) {
            this.nextEventTime = j;
            return this;
        }

        @Override // co.cask.cdap.logging.meta.Checkpoint
        public long getMaxEventTime() {
            return this.maxEventTime;
        }

        MutableCheckpoint setMaxEventTime(long j) {
            this.maxEventTime = j;
            return this;
        }

        @Override // co.cask.cdap.logging.meta.Checkpoint
        public String toString() {
            return "Checkpoint{nextOffset=" + this.nextOffset + ", maxEventTime=" + this.maxEventTime + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/logging/pipeline/kafka/KafkaLogProcessorPipeline$OffsetTime.class */
    public static final class OffsetTime implements Comparable<OffsetTime> {
        private final long offset;
        private final long eventTime;

        OffsetTime(long j, long j2) {
            this.offset = j;
            this.eventTime = j2;
        }

        @Override // java.lang.Comparable
        public int compareTo(OffsetTime offsetTime) {
            return Long.compare(this.offset, offsetTime.offset);
        }

        long getOffset() {
            return this.offset;
        }

        long getEventTime() {
            return this.eventTime;
        }
    }

    public KafkaLogProcessorPipeline(LogProcessorPipelineContext logProcessorPipelineContext, CheckpointManager checkpointManager, BrokerService brokerService, KafkaPipelineConfig kafkaPipelineConfig) {
        this.name = logProcessorPipelineContext.getName();
        this.context = logProcessorPipelineContext;
        this.checkpointManager = checkpointManager;
        this.brokerService = brokerService;
        this.config = kafkaPipelineConfig;
        this.eventQueue = new TimeEventQueue<>(kafkaPipelineConfig.getPartitions());
        this.metricsContext = logProcessorPipelineContext;
        this.offsetResolver = new KafkaOffsetResolver(brokerService, kafkaPipelineConfig);
    }

    protected void startUp() throws Exception {
        LOG.debug("Starting log processor pipeline for {} with configurations {}", this.name, this.config);
        Set<Integer> partitions = this.config.getPartitions();
        for (Map.Entry<Integer, Checkpoint> entry : this.checkpointManager.getCheckpoint(partitions).entrySet()) {
            Checkpoint value = entry.getValue();
            if (value.getNextOffset() >= 0 && value.getNextEventTime() >= 0 && value.getMaxEventTime() >= 0) {
                this.checkpoints.put(entry.getKey(), new MutableCheckpoint(value));
            }
        }
        this.context.start();
        this.fetchExecutor = Executors.newFixedThreadPool(partitions.size(), Threads.createDaemonThreadFactory("fetcher-" + this.name + "-%d"));
        emitConfigMetrics();
        LOG.info("Log processor pipeline for {} with config {} started with checkpoint {}", new Object[]{this.name, this.config, this.checkpoints});
    }

    protected void run() {
        this.runThread = Thread.currentThread();
        try {
            initializeOffsets();
            LOG.info("Kafka offsets initialize for pipeline {} as {}", this.name, this.offsets);
            HashMap hashMap = new HashMap();
            String topic = this.config.getTopic();
            this.lastCheckpointTime = System.currentTimeMillis();
            while (!this.stopped) {
                boolean z = false;
                for (Map.Entry entry : fetchAll(this.offsets, hashMap).entrySet()) {
                    int intValue = ((Integer) entry.getKey()).intValue();
                    try {
                        if (processMessages(topic, intValue, (Future) entry.getValue())) {
                            z = true;
                        }
                    } catch (IOException | KafkaException e) {
                        OUTAGE_LOG.warn("Failed to fetch or process messages from {}:{}. Will be retried in next iteration.", new Object[]{topic, Integer.valueOf(intValue), e});
                    }
                }
                long currentTimeMillis = System.currentTimeMillis();
                this.unSyncedEvents += appendEvents(currentTimeMillis, false);
                long trySyncAndPersistCheckpoints = trySyncAndPersistCheckpoints(currentTimeMillis);
                if (!z) {
                    long eventDelayMillis = this.config.getEventDelayMillis();
                    if (!this.eventQueue.isEmpty()) {
                        eventDelayMillis += this.eventQueue.first().getTimeStamp() - currentTimeMillis;
                    }
                    long min = Math.min(eventDelayMillis, trySyncAndPersistCheckpoints);
                    if (min > 0) {
                        TimeUnit.MILLISECONDS.sleep(min);
                    }
                }
            }
        } catch (InterruptedException e2) {
        }
    }

    protected void triggerShutdown() {
        this.stopped = true;
        if (this.runThread != null) {
            this.runThread.interrupt();
        }
    }

    protected void shutDown() throws Exception {
        LOG.debug("Shutting down log processor pipeline for {}", this.name);
        this.fetchExecutor.shutdownNow();
        try {
            this.context.stop();
            persistCheckpoints();
        } catch (Exception e) {
            LOG.warn("Exception raised when stopping pipeline {}", this.name, e);
        }
        Iterator<KafkaSimpleConsumer> it = this.kafkaConsumers.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e2) {
                LOG.warn("Exception raised when closing Kafka consumer.", e2);
            }
        }
        LOG.info("Log processor pipeline for {} stopped with latest checkpoints {}", this.name, this.checkpoints);
    }

    protected String getServiceName() {
        return "LogPipeline-" + this.name;
    }

    private void initializeOffsets() throws InterruptedException {
        HashSet hashSet = new HashSet();
        hashSet.addAll(this.config.getPartitions());
        while (!hashSet.isEmpty() && !this.stopped) {
            Iterator it = hashSet.iterator();
            boolean z = false;
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                Checkpoint checkpoint = (Checkpoint) this.checkpoints.get(intValue);
                if (checkpoint != null) {
                    try {
                    } catch (Exception e) {
                        OUTAGE_LOG.warn("Failed to get a valid offset from Kafka to start consumption for {}:{}", this.config.getTopic(), Integer.valueOf(intValue));
                        z = true;
                    }
                    if (checkpoint.getNextOffset() > 0) {
                        this.offsets.put(intValue, this.offsetResolver.getStartOffset(checkpoint, intValue));
                        it.remove();
                    }
                }
                this.offsets.put(intValue, getLastOffset(intValue, OffsetRequest.EarliestTime()));
                it.remove();
            }
            if (z && !this.stopped) {
                TimeUnit.SECONDS.sleep(2L);
            }
        }
    }

    private boolean processMessages(String str, int i, Future<Iterable<MessageAndOffset>> future) throws InterruptedException, KafkaException, IOException {
        try {
            boolean z = false;
            for (MessageAndOffset messageAndOffset : future.get()) {
                if (this.eventQueue.getEventSize() >= this.config.getMaxBufferSize()) {
                    OUTAGE_LOG.info("Maximum queue size {} reached for pipeline {}.", Long.valueOf(this.config.getMaxBufferSize()), this.name);
                    int appendEvents = appendEvents(System.currentTimeMillis(), true);
                    if (appendEvents <= 0) {
                        break;
                    }
                    this.unSyncedEvents += appendEvents;
                }
                try {
                    this.metricsContext.increment("kafka.bytes.read", messageAndOffset.message().payloadSize());
                    ILoggingEvent fromBytes = this.serializer.fromBytes(messageAndOffset.message().payload());
                    this.eventQueue.add(fromBytes, fromBytes.getTimeStamp(), messageAndOffset.message().payloadSize(), i, new OffsetTime(messageAndOffset.nextOffset(), fromBytes.getTimeStamp()));
                } catch (IOException e) {
                    LOG.trace("Fail to decode logging event from {}:{} at offset {}. Skipping it.", new Object[]{str, Integer.valueOf(i), Long.valueOf(messageAndOffset.offset()), e});
                }
                z = true;
                this.offsets.put(i, messageAndOffset.nextOffset());
            }
            return z;
        } catch (ExecutionException e2) {
            try {
                throw e2.getCause();
            } catch (KafkaException | IOException e3) {
                throw e3;
            } catch (OffsetOutOfRangeException e4) {
                this.offsets.put(i, getLastOffset(i, OffsetRequest.EarliestTime()));
                return false;
            } catch (Throwable th) {
                throw new IOException(th);
            }
        }
    }

    private <T extends Map<Integer, Future<Iterable<MessageAndOffset>>>> T fetchAll(Int2LongMap int2LongMap, T t) {
        Iterator<Integer> it = this.config.getPartitions().iterator();
        while (it.hasNext()) {
            final int intValue = it.next().intValue();
            final long j = int2LongMap.get(intValue);
            t.put(Integer.valueOf(intValue), this.fetchExecutor.submit(new Callable<Iterable<MessageAndOffset>>() { // from class: co.cask.cdap.logging.pipeline.kafka.KafkaLogProcessorPipeline.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Iterable<MessageAndOffset> call() throws Exception {
                    return KafkaLogProcessorPipeline.this.fetchMessages(intValue, j);
                }
            }));
        }
        return t;
    }

    private int appendEvents(long j, boolean z) {
        long eventDelayMillis = j - this.config.getEventDelayMillis();
        long maxBufferSize = z ? (long) (this.config.getMaxBufferSize() * MIN_FREE_FACTOR) : Long.MAX_VALUE;
        TimeEventQueue.EventIterator<ILoggingEvent, OffsetTime> it = this.eventQueue.iterator();
        int i = 0;
        long j2 = Long.MAX_VALUE;
        long j3 = -1;
        while (it.hasNext()) {
            ILoggingEvent next = it.next();
            if (this.eventQueue.getEventSize() <= maxBufferSize && next.getTimeStamp() >= eventDelayMillis) {
                break;
            }
            long currentTimeMillis = System.currentTimeMillis() - next.getTimeStamp();
            j2 = currentTimeMillis < j2 ? currentTimeMillis : j2;
            j3 = currentTimeMillis > j3 ? currentTimeMillis : j3;
            try {
                ch.qos.logback.classic.Logger effectiveLogger = this.context.getEffectiveLogger(next.getLoggerName());
                if (next.getLevel().isGreaterOrEqual(effectiveLogger.getEffectiveLevel())) {
                    effectiveLogger.callAppenders(next);
                }
                int partition = it.getPartition();
                MutableCheckpoint mutableCheckpoint = (MutableCheckpoint) this.checkpoints.get(partition);
                OffsetTime smallestOffset = this.eventQueue.getSmallestOffset(partition);
                if (mutableCheckpoint == null) {
                    this.checkpoints.put(partition, new MutableCheckpoint(smallestOffset.getOffset(), smallestOffset.getEventTime(), next.getTimeStamp()));
                } else {
                    mutableCheckpoint.setNextOffset(smallestOffset.getOffset()).setNextEvenTime(smallestOffset.getEventTime()).setMaxEventTime(next.getTimeStamp());
                }
                it.remove();
                i++;
            } catch (Exception e) {
                OUTAGE_LOG.warn("Failed to append log event in pipeline {}. Will be retried.", this.name, e);
            }
        }
        for (Int2LongMap.Entry entry : this.offsets.int2LongEntrySet()) {
            int intKey = entry.getIntKey();
            if (this.eventQueue.isEmpty(intKey)) {
                MutableCheckpoint mutableCheckpoint2 = (MutableCheckpoint) this.checkpoints.get(intKey);
                long longValue = entry.getLongValue();
                if (mutableCheckpoint2 != null && longValue > mutableCheckpoint2.getNextOffset()) {
                    mutableCheckpoint2.setNextOffset(longValue);
                }
            }
        }
        if (i > 0) {
            this.metricsContext.gauge("log.process.min.delay", j2);
            this.metricsContext.gauge("log.process.max.delay", j3);
            this.metricsContext.increment("log.process.message.count", i);
        }
        try {
            this.metricsContext.gauge("event.queue.size.bytes", this.eventQueue.getEventSize());
            this.context.flush();
        } catch (IOException e2) {
            OUTAGE_LOG.warn("Failed to flush in pipeline {}. Will be retried.", this.name, e2);
        }
        return i;
    }

    private long trySyncAndPersistCheckpoints(long j) {
        if (this.unSyncedEvents <= 0) {
            return this.config.getCheckpointIntervalMillis();
        }
        if (j - this.config.getCheckpointIntervalMillis() < this.lastCheckpointTime) {
            return (this.config.getCheckpointIntervalMillis() - j) + this.lastCheckpointTime;
        }
        try {
            this.context.sync();
            persistCheckpoints();
            this.lastCheckpointTime = j;
            this.metricsContext.gauge("last.checkpoint.time", this.lastCheckpointTime);
            this.unSyncedEvents = 0;
            LOG.debug("Events synced and checkpoint persisted for {}", this.name);
        } catch (Exception e) {
            OUTAGE_LOG.warn("Failed to sync in pipeline {}. Will be retried.", this.name, e);
        }
        return this.config.getCheckpointIntervalMillis();
    }

    private void persistCheckpoints() {
        try {
            this.checkpointManager.saveCheckpoints(this.checkpoints);
            LOG.debug("Checkpoint persisted for {} with {}", this.name, this.checkpoints);
        } catch (Exception e) {
            OUTAGE_LOG.warn("Failed to persist checkpoints for pipeline {}.", this.name, e);
        }
    }

    @Nullable
    private KafkaSimpleConsumer getKafkaConsumer(String str, int i) {
        BrokerInfo leader = this.brokerService.getLeader(str, i);
        if (leader == null) {
            return null;
        }
        KafkaSimpleConsumer kafkaSimpleConsumer = this.kafkaConsumers.get(leader);
        if (kafkaSimpleConsumer != null) {
            return kafkaSimpleConsumer;
        }
        KafkaSimpleConsumer kafkaSimpleConsumer2 = new KafkaSimpleConsumer(leader, KAFKA_SO_TIMEOUT, this.config.getKafkaFetchBufferSize(), "client-" + this.name + "-" + i);
        this.kafkaConsumers.put(leader, kafkaSimpleConsumer2);
        return kafkaSimpleConsumer2;
    }

    private long getLastOffset(int i, long j) throws KafkaException {
        String topic = this.config.getTopic();
        KafkaSimpleConsumer kafkaConsumer = getKafkaConsumer(topic, i);
        if (kafkaConsumer == null) {
            throw new LeaderNotAvailableException("No broker to fetch offsets for " + topic + ":" + i);
        }
        try {
            return KafkaUtil.getOffsetByTimestamp(kafkaConsumer, topic, i, j);
        } catch (KafkaException e) {
            this.kafkaConsumers.remove(kafkaConsumer.getBrokerInfo());
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Iterable<MessageAndOffset> fetchMessages(int i, long j) throws KafkaException {
        String topic = this.config.getTopic();
        KafkaSimpleConsumer kafkaConsumer = getKafkaConsumer(topic, i);
        if (kafkaConsumer == null) {
            throw new LeaderNotAvailableException("No broker to fetch messages for " + topic + ":" + i);
        }
        LOG.trace("Fetching messages from Kafka on {}:{} for pipeline {} with offset {}", new Object[]{topic, Integer.valueOf(i), this.name, Long.valueOf(j)});
        try {
            ByteBufferMessageSet fetchMessages = KafkaUtil.fetchMessages(kafkaConsumer, topic, i, this.config.getKafkaFetchBufferSize(), j);
            LOG.trace("Fetched {} bytes from Kafka on {}:{} for pipeline {}", new Object[]{Integer.valueOf(fetchMessages.sizeInBytes()), topic, Integer.valueOf(i), this.name});
            return fetchMessages;
        } catch (OffsetOutOfRangeException e) {
            this.kafkaConsumers.remove(kafkaConsumer.getBrokerInfo());
            throw e;
        }
    }

    private void emitConfigMetrics() {
        this.metricsContext.gauge("max.buffer.size", this.config.getMaxBufferSize());
        this.metricsContext.gauge("event.delay.millis", this.config.getEventDelayMillis());
        this.metricsContext.gauge("kafka.fetch.buffer.size", this.config.getKafkaFetchBufferSize());
        this.metricsContext.gauge("checkpoint.interval.millis", this.config.getCheckpointIntervalMillis());
    }
}
