package co.cask.cdap.logging.save;

import ch.qos.logback.classic.spi.ILoggingEvent;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.logging.appender.kafka.LoggingEventSerializer;
import co.cask.cdap.logging.context.LoggingContextHelper;
import co.cask.cdap.logging.kafka.KafkaLogEvent;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.avro.generic.GenericRecord;
import org.apache.twill.kafka.client.FetchedMessage;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/save/KafkaMessageCallback.class */
public class KafkaMessageCallback implements KafkaConsumer.MessageCallback {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageCallback.class);
    private final int partition;
    private final Set<KafkaLogProcessor> kafkaLogProcessors;
    private final LoggingEventSerializer serializer = new LoggingEventSerializer();
    private final CountDownLatch stopLatch;
    private final MetricsContext metricsContext;
    private final String delayMetric;

    public KafkaMessageCallback(int i, CountDownLatch countDownLatch, Set<KafkaLogProcessor> set, MetricsContext metricsContext) throws Exception {
        this.partition = i;
        this.kafkaLogProcessors = set;
        this.stopLatch = countDownLatch;
        this.metricsContext = metricsContext;
        this.delayMetric = "log.process.delay." + i;
    }

    public void onReceived(Iterator<FetchedMessage> it) {
        try {
            if (this.stopLatch.await(1L, TimeUnit.NANOSECONDS)) {
                LOG.debug("Returning since callback is cancelled.");
                return;
            }
            long j = Long.MAX_VALUE;
            ArrayList newArrayList = Lists.newArrayList();
            while (it.hasNext()) {
                FetchedMessage next = it.next();
                try {
                    GenericRecord genericRecord = this.serializer.toGenericRecord(next.getPayload());
                    ILoggingEvent fromGenericRecord = this.serializer.fromGenericRecord(genericRecord);
                    LOG.trace("Got event {} for partition {}", fromGenericRecord, Integer.valueOf(this.partition));
                    newArrayList.add(new KafkaLogEvent(genericRecord, fromGenericRecord, LoggingContextHelper.getLoggingContext(fromGenericRecord.getMDCPropertyMap()), next.getTopicPartition().getPartition(), next.getNextOffset()));
                    if (fromGenericRecord.getTimeStamp() < j) {
                        j = fromGenericRecord.getTimeStamp();
                    }
                } catch (Throwable th) {
                    LOG.warn("Message with next offset {} ignored due to exception for topic {} parition {}", new Object[]{Long.valueOf(next.getNextOffset()), next.getTopicPartition().getTopic(), Integer.valueOf(next.getTopicPartition().getPartition()), th});
                }
            }
            int size = newArrayList.size();
            if (!newArrayList.isEmpty()) {
                for (KafkaLogProcessor kafkaLogProcessor : this.kafkaLogProcessors) {
                    try {
                        kafkaLogProcessor.process(newArrayList.iterator());
                    } catch (Throwable th2) {
                        LOG.warn("Exception processing {} kafka log events in processor {}, ignoring the exception", new Object[]{Integer.valueOf(newArrayList.size()), kafkaLogProcessor.getClass().getSimpleName(), th2});
                    }
                }
                this.metricsContext.gauge(this.delayMetric, System.currentTimeMillis() - j);
                this.metricsContext.increment("log.process.message.count", size);
            }
            LOG.trace("Got {} messages from kafka", Integer.valueOf(size));
        } catch (InterruptedException e) {
            LOG.error("Exception: ", e);
            Thread.currentThread().interrupt();
        }
    }

    public void finished() {
        LOG.info("KafkaMessageCallback finished for partition {}.", Integer.valueOf(this.partition));
    }
}
