package co.cask.cdap.logging.save;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.logging.LoggingConfiguration;
import co.cask.cdap.logging.appender.kafka.LoggingEventSerializer;
import co.cask.cdap.logging.kafka.KafkaLogEvent;
import co.cask.cdap.logging.write.AvroFileWriter;
import co.cask.cdap.logging.write.FileMetaDataManager;
import co.cask.cdap.logging.write.LogCleanup;
import co.cask.cdap.logging.write.LogFileWriter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.RowSortedTable;
import com.google.common.collect.TreeBasedTable;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/save/KafkaLogWriterPlugin.class */
public class KafkaLogWriterPlugin extends AbstractKafkaLogProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaLogWriterPlugin.class);
    public static final int CHECKPOINT_ROW_KEY_PREFIX = 100;
    private static final long SLEEP_TIME_MS = 100;
    private final String logBaseDir;
    private final LogFileWriter<KafkaLogEvent> logFileWriter;
    private final long eventBucketIntervalMs;
    private final int logCleanupIntervalMins;
    private final long maxNumberOfBucketsInTable;
    private final LogCleanup logCleanup;
    private final CheckpointManager checkpointManager;
    private ListeningScheduledExecutorService scheduledExecutor;
    private CountDownLatch countDownLatch;
    private final LoggingEventSerializer serializer = new LoggingEventSerializer();
    private final RowSortedTable<Long, String, Map.Entry<Long, List<KafkaLogEvent>>> messageTable = TreeBasedTable.create();

    @Inject
    KafkaLogWriterPlugin(CConfiguration cConfiguration, FileMetaDataManager fileMetaDataManager, LocationFactory locationFactory, CheckpointManagerFactory checkpointManagerFactory) throws Exception {
        this.logBaseDir = cConfiguration.get(LoggingConfiguration.LOG_BASE_DIR);
        Preconditions.checkNotNull(this.logBaseDir, "Log base dir cannot be null");
        LOG.info(String.format("Log base dir is %s", this.logBaseDir));
        long j = cConfiguration.getLong(LoggingConfiguration.LOG_RETENTION_DURATION_DAYS, 30L);
        Preconditions.checkArgument(j > 0, "Log file retention duration is invalid: %s", new Object[]{Long.valueOf(j)});
        long j2 = cConfiguration.getLong(LoggingConfiguration.LOG_MAX_FILE_SIZE_BYTES, 20971520L);
        Preconditions.checkArgument(j2 > 0, "Max log file size is invalid: %s", new Object[]{Long.valueOf(j2)});
        int i = cConfiguration.getInt(LoggingConfiguration.LOG_FILE_SYNC_INTERVAL_BYTES, 51200);
        Preconditions.checkArgument(i > 0, "Log file sync interval is invalid: %s", new Object[]{Integer.valueOf(i)});
        long j3 = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_CHECKPOINT_INTERVAL_MS, LoggingConfiguration.DEFAULT_LOG_SAVER_CHECKPOINT_INTERVAL_MS);
        Preconditions.checkArgument(j3 > 0, "Checkpoint interval is invalid: %s", new Object[]{Long.valueOf(j3)});
        long j4 = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_INACTIVE_FILE_INTERVAL_MS, LoggingConfiguration.DEFAULT_LOG_SAVER_INACTIVE_FILE_INTERVAL_MS);
        Preconditions.checkArgument(j4 > 0, "Inactive interval is invalid: %s", new Object[]{Long.valueOf(j4)});
        this.eventBucketIntervalMs = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_EVENT_BUCKET_INTERVAL_MS, 1000L);
        Preconditions.checkArgument(this.eventBucketIntervalMs > 0, "Event bucket interval is invalid: %s", new Object[]{Long.valueOf(this.eventBucketIntervalMs)});
        this.maxNumberOfBucketsInTable = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_MAXIMUM_INMEMORY_EVENT_BUCKETS, 8L);
        Preconditions.checkArgument(this.maxNumberOfBucketsInTable > 0, "Maximum number of event buckets in memory is invalid: %s", new Object[]{Long.valueOf(this.maxNumberOfBucketsInTable)});
        long j5 = cConfiguration.getLong(LoggingConfiguration.LOG_SAVER_TOPIC_WAIT_SLEEP_MS, LoggingConfiguration.DEFAULT_LOG_SAVER_TOPIC_WAIT_SLEEP_MS);
        Preconditions.checkArgument(j5 > 0, "Topic creation wait sleep is invalid: %s", new Object[]{Long.valueOf(j5)});
        this.logCleanupIntervalMins = cConfiguration.getInt(LoggingConfiguration.LOG_CLEANUP_RUN_INTERVAL_MINS, LoggingConfiguration.DEFAULT_LOG_CLEANUP_RUN_INTERVAL_MINS);
        Preconditions.checkArgument(this.logCleanupIntervalMins > 0, "Log cleanup run interval is invalid: %s", new Object[]{Integer.valueOf(this.logCleanupIntervalMins)});
        AvroFileWriter avroFileWriter = new AvroFileWriter(fileMetaDataManager, cConfiguration, locationFactory.create(""), this.logBaseDir, this.serializer.getAvroSchema(), j2, i, j4);
        this.checkpointManager = checkpointManagerFactory.create(cConfiguration.get("log.kafka.topic"), 100);
        this.logFileWriter = new CheckpointingLogFileWriter(avroFileWriter, this.checkpointManager, j3);
        this.logCleanup = new LogCleanup(fileMetaDataManager, locationFactory.create(""), cConfiguration.get("namespaces.dir"), TimeUnit.MILLISECONDS.convert(j, TimeUnit.DAYS));
    }

    @Override // co.cask.cdap.logging.save.KafkaLogProcessor
    public void init(Set<Integer> set) {
        super.init(set, this.checkpointManager);
        this.scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("log-saver-log-processor")));
        this.scheduledExecutor.scheduleWithFixedDelay(new LogWriter(this.logFileWriter, this.messageTable, this.eventBucketIntervalMs, this.maxNumberOfBucketsInTable), SLEEP_TIME_MS, 200L, TimeUnit.MILLISECONDS);
        this.countDownLatch = new CountDownLatch(1);
        if (set.contains(0)) {
            LOG.info("Scheduling cleanup task");
            this.scheduledExecutor.scheduleAtFixedRate(this.logCleanup, 10L, this.logCleanupIntervalMins, TimeUnit.MINUTES);
        }
    }

    @Override // co.cask.cdap.logging.save.AbstractKafkaLogProcessor
    public void doProcess(KafkaLogEvent kafkaLogEvent) {
        List list;
        LoggingContext loggingContext = kafkaLogEvent.getLoggingContext();
        try {
            long timeStamp = kafkaLogEvent.getLogEvent().getTimeStamp() / this.eventBucketIntervalMs;
            do {
                synchronized (this.messageTable) {
                    SortedSet rowKeySet = this.messageTable.rowKeySet();
                    if (!rowKeySet.isEmpty()) {
                        long longValue = ((Long) rowKeySet.first()).longValue();
                        if (timeStamp > longValue + this.maxNumberOfBucketsInTable) {
                            LOG.trace("key={}, oldestBucketKey={}, maxNumberOfBucketsInTable={}. Sleeping for {} ms.", new Object[]{Long.valueOf(timeStamp), Long.valueOf(longValue), Long.valueOf(this.maxNumberOfBucketsInTable), Long.valueOf(SLEEP_TIME_MS)});
                        }
                    }
                    synchronized (this.messageTable) {
                        if (((Map.Entry) this.messageTable.get(Long.valueOf(timeStamp), loggingContext.getLogPathFragment(this.logBaseDir))) == null) {
                            long currentTimeMillis = System.currentTimeMillis() / this.eventBucketIntervalMs;
                            list = Lists.newArrayList();
                            this.messageTable.put(Long.valueOf(timeStamp), loggingContext.getLogPathFragment(this.logBaseDir), new AbstractMap.SimpleEntry(Long.valueOf(currentTimeMillis), list));
                        } else {
                            list = (List) ((Map.Entry) this.messageTable.get(Long.valueOf(timeStamp), loggingContext.getLogPathFragment(this.logBaseDir))).getValue();
                        }
                        list.add(new KafkaLogEvent(kafkaLogEvent.getGenericRecord(), kafkaLogEvent.getLogEvent(), loggingContext, kafkaLogEvent.getPartition(), kafkaLogEvent.getNextOffset()));
                    }
                    return;
                }
            } while (!this.countDownLatch.await(SLEEP_TIME_MS, TimeUnit.MILLISECONDS));
            LOG.debug("Returning since callback is cancelled");
        } catch (Throwable th) {
            LOG.warn("Exception while processing message with nextOffset {}. Skipping it.", Long.valueOf(kafkaLogEvent.getNextOffset()), th);
        }
    }

    @Override // co.cask.cdap.logging.save.KafkaLogProcessor
    public void stop() {
        try {
            if (this.countDownLatch != null) {
                this.countDownLatch.countDown();
            }
            if (this.scheduledExecutor != null) {
                this.scheduledExecutor.shutdown();
                this.scheduledExecutor.awaitTermination(5L, TimeUnit.MINUTES);
            }
            this.logFileWriter.flush();
            this.logFileWriter.close();
        } catch (Exception e) {
            LOG.error("Caught exception while closing logWriter {}", e.getMessage(), e);
        }
        this.messageTable.clear();
    }

    @Override // co.cask.cdap.logging.save.KafkaLogProcessor
    public Checkpoint getCheckpoint(int i) {
        try {
            return this.checkpointManager.getCheckpoint(i);
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @VisibleForTesting
    CheckpointManager getCheckPointManager() {
        return this.checkpointManager;
    }
}
