package co.cask.cdap.data.stream.service;

import co.cask.cdap.api.data.stream.StreamSpecification;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.common.stream.notification.StreamSizeNotification;
import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.data.stream.StreamPropertyListener;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.notifications.feeds.NotificationFeedException;
import co.cask.cdap.notifications.service.NotificationService;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.NotificationFeedId;
import co.cask.cdap.proto.id.StreamId;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import java.io.FileNotFoundException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.twill.common.Cancellable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data/stream/service/LocalStreamService.class */
public class LocalStreamService extends AbstractStreamService {
    private static final Logger LOG = LoggerFactory.getLogger(LocalStreamService.class);
    private final NotificationService notificationService;
    private final StreamAdmin streamAdmin;
    private final StreamWriterSizeCollector streamWriterSizeCollector;
    private final StreamMetaStore streamMetaStore;
    private final ConcurrentMap<StreamId, StreamSizeAggregator> aggregators;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/service/LocalStreamService$StreamSizeAggregator.class */
    public final class StreamSizeAggregator implements Cancellable {
        private final long streamInitSize;
        private final NotificationFeedId streamFeed;
        private final StreamId streamId;
        private final AtomicLong streamBaseCount;
        private final AtomicInteger streamThresholdMB;
        private final Cancellable cancellable;
        private boolean published;

        protected StreamSizeAggregator(StreamId streamId, long j, int i, Cancellable cancellable) {
            this.streamId = streamId;
            this.streamInitSize = j;
            this.streamBaseCount = new AtomicLong(j);
            this.cancellable = cancellable;
            this.streamFeed = new NotificationFeedId(streamId.getNamespace(), "stream", String.format("%sSize", streamId.getEntityName()));
            this.streamThresholdMB = new AtomicInteger(i);
        }

        public void cancel() {
            this.cancellable.cancel();
        }

        public void setStreamThresholdMB(int i) {
            this.streamThresholdMB.set(i);
        }

        public void checkAggregatedSize() {
            long totalCollected = this.streamInitSize + LocalStreamService.this.streamWriterSizeCollector.getTotalCollected(this.streamId);
            if (!this.published || totalCollected - this.streamBaseCount.get() > toBytes(this.streamThresholdMB.get())) {
                try {
                    publishNotification(totalCollected);
                    this.streamBaseCount.set(totalCollected);
                } catch (Throwable th) {
                    this.streamBaseCount.set(totalCollected);
                    throw th;
                }
            }
            this.published = true;
        }

        private long toBytes(int i) {
            return i * 1024 * 1024;
        }

        private void publishNotification(long j) {
            try {
                LocalStreamService.this.notificationService.publish(this.streamFeed, new StreamSizeNotification(System.currentTimeMillis(), j)).get();
            } catch (NotificationFeedException e) {
                LocalStreamService.LOG.warn("Error with notification feed {}", this.streamFeed, e);
            } catch (Throwable th) {
                LocalStreamService.LOG.debug("Could not publish notification on feed {}", this.streamFeed, th);
            }
        }
    }

    @Inject
    public LocalStreamService(StreamCoordinatorClient streamCoordinatorClient, StreamFileJanitorService streamFileJanitorService, StreamMetaStore streamMetaStore, StreamAdmin streamAdmin, StreamWriterSizeCollector streamWriterSizeCollector, NotificationService notificationService, MetricStore metricStore) {
        super(streamCoordinatorClient, streamFileJanitorService, streamWriterSizeCollector, metricStore);
        this.streamAdmin = streamAdmin;
        this.streamMetaStore = streamMetaStore;
        this.streamWriterSizeCollector = streamWriterSizeCollector;
        this.notificationService = notificationService;
        this.aggregators = Maps.newConcurrentMap();
    }

    @Override // co.cask.cdap.data.stream.service.AbstractStreamService
    protected void initialize() throws Exception {
        for (Map.Entry entry : this.streamMetaStore.listStreams().entries()) {
            StreamId stream = ((NamespaceId) entry.getKey()).stream(((StreamSpecification) entry.getValue()).getName());
            try {
                createSizeAggregator(stream, getStreamEventsSize(stream), this.streamAdmin.getConfig(stream).getNotificationThresholdMB());
            } catch (FileNotFoundException e) {
                LOG.warn("Inconsistent stream state: Stream '{}' exists in meta store but its configuration file does not exist", stream);
            } catch (Exception e2) {
                LOG.warn("Inconsistent stream state: Stream '{}' exists in meta store but its configuration cannot be read:", stream, e2);
            }
        }
    }

    @Override // co.cask.cdap.data.stream.service.AbstractStreamService
    protected void doShutdown() throws Exception {
        Iterator<StreamSizeAggregator> it = this.aggregators.values().iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
    }

    protected void runOneIteration() throws Exception {
        for (Map.Entry entry : this.streamMetaStore.listStreams().entries()) {
            StreamId stream = ((NamespaceId) entry.getKey()).stream(((StreamSpecification) entry.getValue()).getName());
            StreamSizeAggregator streamSizeAggregator = this.aggregators.get(stream);
            if (streamSizeAggregator == null) {
                try {
                    try {
                        streamSizeAggregator = createSizeAggregator(stream, 0L, this.streamAdmin.getConfig(stream).getNotificationThresholdMB());
                    } catch (Exception e) {
                        LOG.warn("Exception in aggregating stream size for {}", stream, e);
                    }
                } catch (FileNotFoundException e2) {
                }
            }
            streamSizeAggregator.checkAggregatedSize();
        }
    }

    private StreamSizeAggregator createSizeAggregator(StreamId streamId, long j, int i) {
        StreamSizeAggregator streamSizeAggregator = new StreamSizeAggregator(streamId, j, i, getStreamCoordinatorClient().addListener(streamId, new StreamPropertyListener() { // from class: co.cask.cdap.data.stream.service.LocalStreamService.1
            @Override // co.cask.cdap.data.stream.StreamPropertyListener
            public void thresholdChanged(StreamId streamId2, int i2) {
                Object obj = LocalStreamService.this.aggregators.get(streamId2);
                while (true) {
                    StreamSizeAggregator streamSizeAggregator2 = (StreamSizeAggregator) obj;
                    if (streamSizeAggregator2 != null) {
                        streamSizeAggregator2.setStreamThresholdMB(i2);
                        return;
                    } else {
                        Thread.yield();
                        obj = LocalStreamService.this.aggregators.get(streamId2);
                    }
                }
            }
        }));
        this.aggregators.put(streamId, streamSizeAggregator);
        return streamSizeAggregator;
    }
}
