package co.cask.cdap.metrics.process;

import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.metrics.MetricType;
import co.cask.cdap.api.metrics.MetricValue;
import co.cask.cdap.api.metrics.MetricValues;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.common.io.BinaryDecoder;
import co.cask.cdap.internal.io.DatumReader;
import co.cask.common.io.ByteBufferInputStream;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.twill.kafka.client.FetchedMessage;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/metrics/process/MetricsMessageCallback.class */
public final class MetricsMessageCallback implements KafkaConsumer.MessageCallback {
    private static final Logger LOG = LoggerFactory.getLogger(MetricsMessageCallback.class);
    private final DatumReader<MetricValues> recordReader;
    private final Schema recordSchema;
    private final MetricStore metricStore;
    private final Map<String, String> metricsContext;
    private long lastLoggedMillis;
    private long recordsProcessed;

    public MetricsMessageCallback(DatumReader<MetricValues> datumReader, Schema schema, MetricStore metricStore, @Nullable MetricsContext metricsContext) {
        this.recordReader = datumReader;
        this.recordSchema = schema;
        this.metricStore = metricStore;
        this.metricsContext = metricsContext == null ? Collections.emptyMap() : metricsContext.getTags();
    }

    public void onReceived(Iterator<FetchedMessage> it) {
        ByteBufferInputStream byteBufferInputStream = new ByteBufferInputStream((ByteBuffer) null);
        ArrayList newArrayList = Lists.newArrayList();
        while (it.hasNext()) {
            try {
                newArrayList.add((MetricValues) this.recordReader.read(new BinaryDecoder(byteBufferInputStream.reset(it.next().getPayload())), this.recordSchema));
            } catch (IOException e) {
                LOG.info("Failed to decode message to MetricValue. Skipped. {}", e.getMessage());
            }
        }
        if (newArrayList.isEmpty()) {
            LOG.info("No records to process.");
            return;
        }
        try {
            addProcessingStats(newArrayList);
            this.metricStore.add(newArrayList);
            this.recordsProcessed += newArrayList.size();
            if (System.currentTimeMillis() > this.lastLoggedMillis + TimeUnit.MINUTES.toMillis(1L)) {
                this.lastLoggedMillis = System.currentTimeMillis();
                LOG.info("{} metrics records processed. Last record time: {}.", Long.valueOf(this.recordsProcessed), Long.valueOf(newArrayList.get(newArrayList.size() - 1).getTimestamp()));
            }
        } catch (Exception e2) {
            throw new RuntimeException("Failed to add metrics data to a store", e2);
        }
    }

    private void addProcessingStats(List<MetricValues> list) {
        if (list.isEmpty()) {
            return;
        }
        int size = list.size();
        long currentTimeMillis = System.currentTimeMillis();
        list.add(new MetricValues(this.metricsContext, TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis), ImmutableList.of(new MetricValue("metrics.process.count", MetricType.COUNTER, size), new MetricValue("metrics.process.delay.ms", MetricType.GAUGE, currentTimeMillis - TimeUnit.SECONDS.toMillis(list.get(list.size() - 1).getTimestamp())))));
    }

    public void finished() {
        LOG.info("Metrics MessageCallback completed.");
    }
}
