package co.cask.cdap.data.stream.service;

import co.cask.cdap.api.metrics.MetricDataQuery;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.metrics.MetricTimeSeries;
import co.cask.cdap.api.metrics.MetricType;
import co.cask.cdap.api.metrics.TimeValue;
import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.proto.Id;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractScheduledService;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Threads;

/* loaded from: input_file:co/cask/cdap/data/stream/service/AbstractStreamService.class */
public abstract class AbstractStreamService extends AbstractScheduledService implements StreamService {
    private final StreamCoordinatorClient streamCoordinatorClient;
    private final StreamFileJanitorService janitorService;
    private final StreamWriterSizeCollector sizeCollector;
    private final MetricStore metricStore;
    private ScheduledExecutorService executor;

    protected abstract void initialize() throws Exception;

    protected abstract void doShutdown() throws Exception;

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamCoordinatorClient getStreamCoordinatorClient() {
        return this.streamCoordinatorClient;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractStreamService(StreamCoordinatorClient streamCoordinatorClient, StreamFileJanitorService streamFileJanitorService, StreamWriterSizeCollector streamWriterSizeCollector, MetricStore metricStore) {
        this.streamCoordinatorClient = streamCoordinatorClient;
        this.janitorService = streamFileJanitorService;
        this.sizeCollector = streamWriterSizeCollector;
        this.metricStore = metricStore;
    }

    protected final void startUp() throws Exception {
        this.streamCoordinatorClient.startAndWait();
        this.janitorService.startAndWait();
        this.sizeCollector.startAndWait();
        initialize();
    }

    protected final void shutDown() throws Exception {
        doShutdown();
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
        this.sizeCollector.stopAndWait();
        this.janitorService.stopAndWait();
        this.streamCoordinatorClient.stopAndWait();
    }

    protected AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedRateSchedule(0L, 2L, TimeUnit.SECONDS);
    }

    protected ScheduledExecutorService executor() {
        this.executor = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("heartbeats-scheduler"));
        return this.executor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getStreamEventsSize(Id.Stream stream) throws IOException {
        try {
            Collection query = this.metricStore.query(new MetricDataQuery(0L, TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), Integer.MAX_VALUE, "system.collect.bytes", MetricType.COUNTER, ImmutableMap.of("ns", stream.getNamespaceId(), "str", stream.getName()), ImmutableList.of()));
            if (query == null || query.isEmpty()) {
                return 0L;
            }
            List timeValues = ((MetricTimeSeries) query.iterator().next()).getTimeValues();
            if (timeValues == null || timeValues.size() != 1) {
                throw new IOException("Should collect exactly one time value");
            }
            return ((TimeValue) timeValues.get(0)).getValue();
        } catch (Exception e) {
            Throwables.propagateIfInstanceOf(e, IOException.class);
            throw new IOException(e);
        }
    }
}
