package co.cask.cdap.metrics.collect;

import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.metrics.MetricValues;
import co.cask.cdap.common.conf.CConfiguration;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:co/cask/cdap/metrics/collect/LocalMetricsCollectionService.class */
public final class LocalMetricsCollectionService extends AggregatedMetricsCollectionService {
    private static final Logger LOG = LoggerFactory.getLogger(LocalMetricsCollectionService.class);
    public static final ImmutableMap<String, String> METRICS_PROCESSOR_CONTEXT = ImmutableMap.of("ns", "system", "cmp", "metrics.processor");
    private final CConfiguration cConf;
    private final MetricStore metricStore;
    private ScheduledExecutorService scheduler;

    @Inject
    public LocalMetricsCollectionService(CConfiguration cConfiguration, MetricStore metricStore) {
        this.cConf = cConfiguration;
        this.metricStore = metricStore;
        metricStore.setMetricsContext(getContext(METRICS_PROCESSOR_CONTEXT));
    }

    @Override // co.cask.cdap.metrics.collect.AggregatedMetricsCollectionService
    protected void publish(Iterator<MetricValues> it) throws Exception {
        while (it.hasNext()) {
            this.metricStore.add(it.next());
        }
    }

    protected void startUp() throws Exception {
        this.scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("metrics-cleanup"));
        this.scheduler.schedule(createCleanupTask(this.cConf.getLong("metrics.data.table.retention.resolution.1.seconds", 2L)), 1L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.cask.cdap.metrics.collect.AggregatedMetricsCollectionService
    public void shutDown() throws Exception {
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
        }
        super.shutDown();
    }

    private Runnable createCleanupTask(final long j) {
        return new Runnable() { // from class: co.cask.cdap.metrics.collect.LocalMetricsCollectionService.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    LocalMetricsCollectionService.this.metricStore.deleteBefore(TimeUnit.SECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) - j);
                    LocalMetricsCollectionService.this.scheduler.schedule(this, 1L, TimeUnit.HOURS);
                } catch (Exception e) {
                    throw Throwables.propagate(e);
                }
            }
        };
    }
}
