package co.cask.cdap.metrics.process;

import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.data.schema.UnsupportedTypeException;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.metrics.MetricValues;
import co.cask.cdap.internal.io.DatumReader;
import co.cask.cdap.internal.io.DatumReaderFactory;
import co.cask.cdap.internal.io.SchemaGenerator;
import com.google.common.base.Throwables;
import com.google.common.reflect.TypeToken;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import org.apache.twill.kafka.client.KafkaConsumer;

/* loaded from: input_file:co/cask/cdap/metrics/process/MetricsMessageCallbackFactory.class */
public final class MetricsMessageCallbackFactory implements MessageCallbackFactory {
    private final DatumReader<MetricValues> datumReader;
    private final Schema recordSchema;
    private final MetricStore metricStore;
    private final int persistThreshold;

    @Inject
    public MetricsMessageCallbackFactory(SchemaGenerator schemaGenerator, DatumReaderFactory datumReaderFactory, MetricStore metricStore, @Named("metrics.kafka.consumer.persist.threshold") int i) {
        try {
            this.recordSchema = schemaGenerator.generate(MetricValues.class);
            this.datumReader = datumReaderFactory.create(TypeToken.of(MetricValues.class), this.recordSchema);
            this.metricStore = metricStore;
            this.persistThreshold = i;
        } catch (UnsupportedTypeException e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // co.cask.cdap.metrics.process.MessageCallbackFactory
    public KafkaConsumer.MessageCallback create(KafkaConsumerMetaTable kafkaConsumerMetaTable) {
        return new PersistedMessageCallback(new MetricsMessageCallback(this.datumReader, this.recordSchema, this.metricStore), kafkaConsumerMetaTable, this.persistThreshold);
    }
}
