package co.cask.cdap.logging.save;

import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.logging.LoggingConfiguration;
import co.cask.cdap.logging.appender.kafka.KafkaTopic;
import co.cask.cdap.logging.context.LoggingContextHelper;
import co.cask.cdap.logging.kafka.KafkaLogEvent;
import co.cask.cdap.proto.Id;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/save/LogMetricsPlugin.class */
public class LogMetricsPlugin extends AbstractKafkaLogProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(LogMetricsPlugin.class);
    private static final int ROW_KEY_PREFIX = 101;
    private static final String SYSTEM_METRIC_PREFIX = "services.log";
    private static final String APP_METRIC_PREFIX = "app.log";
    private final CheckpointManager checkpointManager;
    private final MetricsCollectionService metricsCollectionService;
    private final CConfiguration cConfig;
    private final Map<Integer, Checkpoint> partitionCheckpoints = Maps.newConcurrentMap();
    private ListeningScheduledExecutorService scheduledExecutor;
    private CheckPointWriter checkPointWriter;
    private int partition;

    public LogMetricsPlugin(MetricsCollectionService metricsCollectionService, CheckpointManagerFactory checkpointManagerFactory, CConfiguration cConfiguration) {
        this.metricsCollectionService = metricsCollectionService;
        this.cConfig = cConfiguration;
        this.checkpointManager = checkpointManagerFactory.create(KafkaTopic.getTopic(), ROW_KEY_PREFIX);
    }

    @Override // co.cask.cdap.logging.save.KafkaLogProcessor
    public void init(int i) throws Exception {
        long j = this.cConfig.getLong(LoggingConfiguration.LOG_SAVER_CHECKPOINT_INTERVAL_MS, LoggingConfiguration.DEFAULT_LOG_SAVER_CHECKPOINT_INTERVAL_MS);
        Preconditions.checkArgument(j > 0, "Checkpoint interval is invalid: %s", new Object[]{Long.valueOf(j)});
        Checkpoint checkpoint = this.checkpointManager.getCheckpoint(i);
        super.init(checkpoint);
        this.partition = i;
        this.scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("log-saver-metrics-plugin")));
        this.partitionCheckpoints.clear();
        this.partitionCheckpoints.put(Integer.valueOf(i), checkpoint);
        this.checkPointWriter = new CheckPointWriter(this.checkpointManager, this.partitionCheckpoints);
        this.scheduledExecutor.scheduleWithFixedDelay(this.checkPointWriter, j, j, TimeUnit.MILLISECONDS);
    }

    @Override // co.cask.cdap.logging.save.AbstractKafkaLogProcessor
    public void doProcess(Iterator<KafkaLogEvent> it) {
        while (it.hasNext()) {
            KafkaLogEvent next = it.next();
            Map<String, String> metricsTags = LoggingContextHelper.getMetricsTags(next.getLoggingContext());
            MetricsContext context = this.metricsCollectionService.getContext(metricsTags);
            String metricName = getMetricName(metricsTags.get("ns"), next.getLogEvent().getLevel().toString().toLowerCase());
            if (!metricsTags.containsKey("cmp") || !metricsTags.get("cmp").equals("metrics.processor")) {
                context.increment(metricName, 1L);
            }
            this.partitionCheckpoints.put(Integer.valueOf(next.getPartition()), new Checkpoint(next.getNextOffset(), next.getLogEvent().getTimeStamp()));
        }
    }

    private String getMetricName(String str, String str2) {
        return str.equals(Id.Namespace.SYSTEM.getId()) ? String.format("%s.%s", SYSTEM_METRIC_PREFIX, str2) : String.format("%s.%s", APP_METRIC_PREFIX, str2);
    }

    @Override // co.cask.cdap.logging.save.KafkaLogProcessor
    public void stop() {
        try {
            if (this.scheduledExecutor != null) {
                this.scheduledExecutor.shutdown();
                this.scheduledExecutor.awaitTermination(5L, TimeUnit.MINUTES);
            }
            if (this.checkPointWriter != null) {
                this.checkPointWriter.flush();
            }
        } catch (Throwable th) {
            LOG.error("Caught exception while closing Log metrics plugin {}", th.getMessage(), th);
        }
    }

    @Override // co.cask.cdap.logging.save.KafkaLogProcessor
    public Checkpoint getCheckpoint() {
        try {
            return this.checkpointManager.getCheckpoint(this.partition);
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @VisibleForTesting
    CheckpointManager getCheckPointManager() {
        return this.checkpointManager;
    }
}
