package co.cask.cdap.logging.save;

import ch.qos.logback.classic.spi.ILoggingEvent;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.logging.appender.kafka.LoggingEventSerializer;
import co.cask.cdap.logging.context.LoggingContextHelper;
import co.cask.cdap.logging.kafka.KafkaLogEvent;
import com.google.common.collect.Lists;
import com.google.common.collect.RowSortedTable;
import java.util.AbstractMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.avro.generic.GenericRecord;
import org.apache.twill.kafka.client.FetchedMessage;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/save/LogCollectorCallback.class */
public class LogCollectorCallback implements KafkaConsumer.MessageCallback {
    private static final Logger LOG = LoggerFactory.getLogger(LogCollectorCallback.class);
    private final RowSortedTable<Long, String, Map.Entry<Long, List<KafkaLogEvent>>> messageTable;
    private final LoggingEventSerializer serializer;
    private final long eventBucketIntervalMs;
    private final long maxNumberOfBucketsInTable;
    private final CountDownLatch kafkaCancelCallbackLatch;
    private static final long SLEEP_TIME_MS = 100;
    private final String logBaseDir;

    public LogCollectorCallback(RowSortedTable<Long, String, Map.Entry<Long, List<KafkaLogEvent>>> rowSortedTable, LoggingEventSerializer loggingEventSerializer, long j, long j2, CountDownLatch countDownLatch, String str) {
        this.messageTable = rowSortedTable;
        this.serializer = loggingEventSerializer;
        this.eventBucketIntervalMs = j;
        this.maxNumberOfBucketsInTable = j2;
        this.kafkaCancelCallbackLatch = countDownLatch;
        this.logBaseDir = str;
    }

    public void onReceived(Iterator<FetchedMessage> it) {
        GenericRecord genericRecord;
        ILoggingEvent fromGenericRecord;
        LoggingContext loggingContext;
        long timeStamp;
        List list;
        try {
            if (this.kafkaCancelCallbackLatch.await(50L, TimeUnit.MICROSECONDS)) {
                LOG.info("Returning since callback is cancelled.");
                return;
            }
            int i = 0;
            while (it.hasNext()) {
                FetchedMessage next = it.next();
                try {
                    genericRecord = this.serializer.toGenericRecord(next.getPayload());
                    fromGenericRecord = this.serializer.fromGenericRecord(genericRecord);
                    loggingContext = LoggingContextHelper.getLoggingContext(fromGenericRecord.getMDCPropertyMap());
                    timeStamp = fromGenericRecord.getTimeStamp() / this.eventBucketIntervalMs;
                } catch (Throwable th) {
                    LOG.warn("Exception while processing message with nextOffset {}. Skipping it.", Long.valueOf(next.getNextOffset()), th);
                }
                do {
                    synchronized (this.messageTable) {
                        SortedSet rowKeySet = this.messageTable.rowKeySet();
                        if (!rowKeySet.isEmpty()) {
                            long longValue = ((Long) rowKeySet.first()).longValue();
                            if (timeStamp > longValue + this.maxNumberOfBucketsInTable) {
                                LOG.trace("key={}, oldestBucketKey={}, maxNumberOfBucketsInTable={}. Sleeping for {} ms.", new Object[]{Long.valueOf(timeStamp), Long.valueOf(longValue), Long.valueOf(this.maxNumberOfBucketsInTable), Long.valueOf(SLEEP_TIME_MS)});
                            }
                        }
                    }
                    synchronized (this.messageTable) {
                        if (((Map.Entry) this.messageTable.get(Long.valueOf(timeStamp), loggingContext.getLogPathFragment(this.logBaseDir))) == null) {
                            long currentTimeMillis = System.currentTimeMillis() / this.eventBucketIntervalMs;
                            list = Lists.newArrayList();
                            this.messageTable.put(Long.valueOf(timeStamp), loggingContext.getLogPathFragment(this.logBaseDir), new AbstractMap.SimpleEntry(Long.valueOf(currentTimeMillis), list));
                        } else {
                            list = (List) ((Map.Entry) this.messageTable.get(Long.valueOf(timeStamp), loggingContext.getLogPathFragment(this.logBaseDir))).getValue();
                        }
                        list.add(new KafkaLogEvent(genericRecord, fromGenericRecord, loggingContext, next.getTopicPartition().getPartition(), next.getNextOffset()));
                    }
                    i++;
                } while (!this.kafkaCancelCallbackLatch.await(SLEEP_TIME_MS, TimeUnit.MILLISECONDS));
                LOG.info("Returning since callback is cancelled");
                return;
            }
            LOG.trace("Got {} messages from kafka", Integer.valueOf(i));
        } catch (InterruptedException e) {
            LOG.error("Exception: ", e);
            Thread.currentThread().interrupt();
        }
    }

    public void finished() {
        LOG.info("LogCollectorCallback finished.");
    }
}
