package co.cask.cdap.logging.save;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.io.RootLocationFactory;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.common.namespace.NamespaceQueryAdmin;
import co.cask.cdap.common.namespace.NamespacedLocationFactory;
import co.cask.cdap.common.security.Impersonator;
import co.cask.cdap.logging.LoggingConfiguration;
import co.cask.cdap.logging.appender.kafka.LoggingEventSerializer;
import co.cask.cdap.logging.kafka.KafkaLogEvent;
import co.cask.cdap.logging.write.AvroFileWriter;
import co.cask.cdap.logging.write.FileMetaDataManager;
import co.cask.cdap.logging.write.LogCleanup;
import co.cask.cdap.logging.write.LogFileWriter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.RowSortedTable;
import com.google.common.collect.TreeBasedTable;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.AbstractMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/save/KafkaLogWriterPlugin.class */
public class KafkaLogWriterPlugin extends AbstractKafkaLogProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaLogWriterPlugin.class);
    public static final int CHECKPOINT_ROW_KEY_PREFIX = 100;
    private static final long SLEEP_TIME_MS = 100;
    private final String logBaseDir;
    private final LogFileWriter<KafkaLogEvent> logFileWriter;
    private final long eventBucketIntervalMs;
    private final int logCleanupIntervalMins;
    private final long maxNumberOfBucketsInTable;
    private final LogCleanup logCleanup;
    private final CheckpointManager checkpointManager;
    private ListeningScheduledExecutorService scheduledExecutor;
    private CountDownLatch countDownLatch;
    private int partition;
    private final LoggingEventSerializer serializer = new LoggingEventSerializer();
    private final RowSortedTable<Long, String, Map.Entry<Long, List<KafkaLogEvent>>> messageTable = TreeBasedTable.create();

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaLogWriterPlugin(CConfiguration cConfiguration, FileMetaDataManager fileMetaDataManager, CheckpointManagerFactory checkpointManagerFactory, RootLocationFactory rootLocationFactory, NamespaceQueryAdmin namespaceQueryAdmin, NamespacedLocationFactory namespacedLocationFactory, Impersonator impersonator) throws Exception {
        this.logBaseDir = cConfiguration.get(LoggingConfiguration.LOG_BASE_DIR);
        Preconditions.checkNotNull(this.logBaseDir, "Log base dir cannot be null");
        LOG.debug(String.format("Log base dir is %s", this.logBaseDir));
        long j = cConfiguration.getLong(LoggingConfiguration.LOG_RETENTION_DURATION_DAYS, 30L);
        Preconditions.checkArgument(j > 0, "Log file retention duration is invalid: %s", new Object[]{Long.valueOf(j)});
        long j2 = cConfiguration.getLong(LoggingConfiguration.LOG_MAX_FILE_SIZE_BYTES, 100000000L);
        Preconditions.checkArgument(j2 > 0, "Max log file size is invalid: %s", new Object[]{Long.valueOf(j2)});
        int i = cConfiguration.getInt(LoggingConfiguration.LOG_FILE_SYNC_INTERVAL_BYTES, 10000000);
        Preconditions.checkArgument(i > 0, "Log file sync interval is invalid: %s", new Object[]{Integer.valueOf(i)});
        long j3 = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_CHECKPOINT_INTERVAL_MS, LoggingConfiguration.DEFAULT_LOG_SAVER_CHECKPOINT_INTERVAL_MS);
        Preconditions.checkArgument(j3 > 0, "Checkpoint interval is invalid: %s", new Object[]{Long.valueOf(j3)});
        long j4 = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_MAX_FILE_LIFETIME, LoggingConfiguration.DEFAULT_LOG_SAVER_MAX_FILE_LIFETIME_MS);
        Preconditions.checkArgument(j4 > 0, "Max file lifetime is invalid: %s", new Object[]{Long.valueOf(j4)});
        if (cConfiguration.get(LoggingConfiguration.LOG_SAVER_INACTIVE_FILE_INTERVAL_MS) != null) {
            LOG.warn("Parameter '{}' is no longer supported. Instead, use '{}'.", LoggingConfiguration.LOG_SAVER_INACTIVE_FILE_INTERVAL_MS, LoggingConfiguration.LOG_SAVER_MAX_FILE_LIFETIME);
        }
        this.eventBucketIntervalMs = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_EVENT_BUCKET_INTERVAL_MS, 1000L);
        Preconditions.checkArgument(this.eventBucketIntervalMs > 0, "Event bucket interval is invalid: %s", new Object[]{Long.valueOf(this.eventBucketIntervalMs)});
        this.maxNumberOfBucketsInTable = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_MAXIMUM_INMEMORY_EVENT_BUCKETS, 4L);
        Preconditions.checkArgument(this.maxNumberOfBucketsInTable > 0, "Maximum number of event buckets in memory is invalid: %s", new Object[]{Long.valueOf(this.maxNumberOfBucketsInTable)});
        long j5 = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_TOPIC_WAIT_SLEEP_MS, LoggingConfiguration.DEFAULT_LOG_SAVER_TOPIC_WAIT_SLEEP_MS);
        Preconditions.checkArgument(j5 > 0, "Topic creation wait sleep is invalid: %s", new Object[]{Long.valueOf(j5)});
        this.logCleanupIntervalMins = cConfiguration.getInt(LoggingConfiguration.LOG_CLEANUP_RUN_INTERVAL_MINS, LoggingConfiguration.DEFAULT_LOG_CLEANUP_RUN_INTERVAL_MINS);
        Preconditions.checkArgument(this.logCleanupIntervalMins > 0, "Log cleanup run interval is invalid: %s", new Object[]{Integer.valueOf(this.logCleanupIntervalMins)});
        AvroFileWriter avroFileWriter = new AvroFileWriter(fileMetaDataManager, namespacedLocationFactory, this.logBaseDir, this.serializer.getAvroSchema(), j2, i, j4, impersonator);
        this.checkpointManager = checkpointManagerFactory.create(cConfiguration.get("log.kafka.topic"), 100);
        this.logFileWriter = new CheckpointingLogFileWriter(avroFileWriter, this.checkpointManager, j3);
        this.logCleanup = new LogCleanup(fileMetaDataManager, rootLocationFactory, namespaceQueryAdmin, namespacedLocationFactory, this.logBaseDir, TimeUnit.MILLISECONDS.convert(j, TimeUnit.DAYS), cConfiguration, impersonator);
    }

    @Override // co.cask.cdap.logging.save.KafkaLogProcessor
    public void init(int i) throws Exception {
        this.partition = i;
        super.init(this.checkpointManager.getCheckpoint(i));
        if (i == 0) {
            this.scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(2, Threads.createDaemonThreadFactory("log-saver-log-processor-" + i)));
            LOG.info("Scheduling cleanup task");
            this.scheduledExecutor.scheduleAtFixedRate(this.logCleanup, 10L, this.logCleanupIntervalMins, TimeUnit.MINUTES);
        } else {
            this.scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("log-saver-log-processor-" + i)));
        }
        this.countDownLatch = new CountDownLatch(1);
        this.scheduledExecutor.execute(new LogWriter(this.logFileWriter, this.messageTable, this.eventBucketIntervalMs, this.maxNumberOfBucketsInTable, this.countDownLatch));
    }

    @Override // co.cask.cdap.logging.save.AbstractKafkaLogProcessor
    public void doProcess(Iterator<KafkaLogEvent> it) {
        List list;
        PeekingIterator peekingIterator = Iterators.peekingIterator(it);
        KafkaLogEvent kafkaLogEvent = (KafkaLogEvent) peekingIterator.peek();
        try {
            long timeStamp = kafkaLogEvent.getLogEvent().getTimeStamp() / this.eventBucketIntervalMs;
            do {
                synchronized (this.messageTable) {
                    SortedSet rowKeySet = this.messageTable.rowKeySet();
                    int size = rowKeySet.size();
                    long currentTimeMillis = size == 0 ? System.currentTimeMillis() : ((Long) rowKeySet.first()).longValue();
                    long longValue = size == 0 ? currentTimeMillis : ((Long) rowKeySet.last()).longValue();
                    if (size < this.maxNumberOfBucketsInTable || timeStamp <= longValue) {
                        while (peekingIterator.hasNext()) {
                            KafkaLogEvent kafkaLogEvent2 = (KafkaLogEvent) peekingIterator.next();
                            LoggingContext loggingContext = kafkaLogEvent2.getLoggingContext();
                            long timeStamp2 = kafkaLogEvent2.getLogEvent().getTimeStamp() / this.eventBucketIntervalMs;
                            if (((Map.Entry) this.messageTable.get(Long.valueOf(timeStamp2), loggingContext.getLogPathFragment(this.logBaseDir))) == null) {
                                long currentTimeMillis2 = System.currentTimeMillis() / this.eventBucketIntervalMs;
                                list = Lists.newArrayList();
                                this.messageTable.put(Long.valueOf(timeStamp2), loggingContext.getLogPathFragment(this.logBaseDir), new AbstractMap.SimpleEntry(Long.valueOf(currentTimeMillis2), list));
                            } else {
                                list = (List) ((Map.Entry) this.messageTable.get(Long.valueOf(timeStamp2), loggingContext.getLogPathFragment(this.logBaseDir))).getValue();
                            }
                            list.add(new KafkaLogEvent(kafkaLogEvent2.getGenericRecord(), kafkaLogEvent2.getLogEvent(), loggingContext, kafkaLogEvent2.getPartition(), kafkaLogEvent2.getNextOffset()));
                        }
                        return;
                    }
                    LOG.trace("key={}, oldestBucketKey={}, maxNumberOfBucketsInTable={}, buckets={}. Sleeping for {} ms.", new Object[]{Long.valueOf(timeStamp), Long.valueOf(currentTimeMillis), Long.valueOf(this.maxNumberOfBucketsInTable), Integer.valueOf(size), Long.valueOf(SLEEP_TIME_MS)});
                }
            } while (!this.countDownLatch.await(SLEEP_TIME_MS, TimeUnit.MILLISECONDS));
            LOG.debug("Returning since callback is cancelled");
        } catch (Throwable th) {
            LOG.warn("Exception while processing message with nextOffset {}. Skipping it.", Long.valueOf(kafkaLogEvent.getNextOffset()), th);
        }
    }

    @Override // co.cask.cdap.logging.save.KafkaLogProcessor
    public void stop() {
        try {
            LOG.info("Stopping log writer plugin for partition {}", Integer.valueOf(this.partition));
            if (this.countDownLatch != null) {
                this.countDownLatch.countDown();
            }
            if (this.scheduledExecutor != null) {
                this.scheduledExecutor.shutdown();
                if (!this.scheduledExecutor.awaitTermination(5L, TimeUnit.MINUTES)) {
                    this.scheduledExecutor.shutdownNow();
                }
            }
            this.logFileWriter.flush();
            this.logFileWriter.close();
        } catch (Exception e) {
            LOG.error("Caught exception while closing logWriter {}", e.getMessage(), e);
        }
        this.messageTable.clear();
    }

    @Override // co.cask.cdap.logging.save.KafkaLogProcessor
    public Checkpoint getCheckpoint() {
        try {
            return this.checkpointManager.getCheckpoint(this.partition);
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @VisibleForTesting
    CheckpointManager getCheckPointManager() {
        return this.checkpointManager;
    }
}
