package co.cask.cdap.data.stream.service;

import co.cask.cdap.api.data.stream.StreamSpecification;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.stream.notification.StreamSizeNotification;
import co.cask.cdap.common.zookeeper.coordination.BalancedAssignmentStrategy;
import co.cask.cdap.common.zookeeper.coordination.PartitionReplica;
import co.cask.cdap.common.zookeeper.coordination.ResourceCoordinator;
import co.cask.cdap.common.zookeeper.coordination.ResourceCoordinatorClient;
import co.cask.cdap.common.zookeeper.coordination.ResourceHandler;
import co.cask.cdap.common.zookeeper.coordination.ResourceModifier;
import co.cask.cdap.common.zookeeper.coordination.ResourceRequirement;
import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.data.stream.StreamLeaderListener;
import co.cask.cdap.data.stream.StreamPropertyListener;
import co.cask.cdap.data.stream.service.heartbeat.HeartbeatPublisher;
import co.cask.cdap.data.stream.service.heartbeat.StreamWriterHeartbeat;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.notifications.feeds.NotificationFeedException;
import co.cask.cdap.notifications.feeds.NotificationFeedManager;
import co.cask.cdap.notifications.feeds.NotificationFeedNotFoundException;
import co.cask.cdap.notifications.service.NotificationContext;
import co.cask.cdap.notifications.service.NotificationHandler;
import co.cask.cdap.notifications.service.NotificationService;
import co.cask.cdap.proto.Id;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.Inject;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.twill.api.ElectionHandler;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.internal.zookeeper.LeaderElection;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data/stream/service/DistributedStreamService.class */
public class DistributedStreamService extends AbstractStreamService {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedStreamService.class);
    private static final String STREAMS_COORDINATOR = "streams.coordinator";
    private final ZKClient zkClient;
    private final StreamAdmin streamAdmin;
    private final DiscoveryServiceClient discoveryServiceClient;
    private final StreamWriterSizeCollector streamWriterSizeCollector;
    private final HeartbeatPublisher heartbeatPublisher;
    private final StreamMetaStore streamMetaStore;
    private final ResourceCoordinatorClient resourceCoordinatorClient;
    private final NotificationService notificationService;
    private final NotificationFeedManager feedManager;
    private final Set<StreamLeaderListener> leaderListeners;
    private final int instanceId;
    private Cancellable leaderListenerCancellable;
    private final ConcurrentMap<Id.Stream, StreamSizeAggregator> aggregators;
    private Cancellable heartbeatsSubscription;
    private Supplier<Discoverable> discoverableSupplier;
    private LeaderElection leaderElection;
    private ResourceCoordinator resourceCoordinator;
    private Cancellable coordinationSubscription;
    private ExecutorService heartbeatsSubscriptionExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/service/DistributedStreamService$StreamSizeAggregator.class */
    public final class StreamSizeAggregator implements Cancellable {
        private final Map<Integer, Long> streamWriterSizes = Maps.newHashMap();
        private final Id.NotificationFeed streamFeed;
        private final AtomicLong streamBaseCount;
        private final long streamInitSize;
        private final AtomicInteger streamThresholdMB;
        private final Cancellable cancellable;
        private final Id.Stream streamId;

        protected StreamSizeAggregator(Id.Stream stream, long j, int i, Cancellable cancellable) {
            this.streamBaseCount = new AtomicLong(j);
            this.streamInitSize = j;
            this.streamThresholdMB = new AtomicInteger(i);
            this.cancellable = cancellable;
            this.streamId = stream;
            this.streamFeed = new Id.NotificationFeed.Builder().setNamespaceId(stream.getNamespaceId()).setCategory("stream").setName(String.format("%sSize", stream.getId())).build();
        }

        public void init() {
            publishNotification(this.streamInitSize);
        }

        public void cancel() {
            this.cancellable.cancel();
        }

        public void setStreamThresholdMB(int i) {
            DistributedStreamService.LOG.debug("Updating threshold of size aggregator for stream {}: {}MB", this.streamId, Integer.valueOf(i));
            this.streamThresholdMB.set(i);
        }

        public void bytesReceived(int i, long j) {
            DistributedStreamService.LOG.trace("Bytes received from instanceId {}: {}B", Integer.valueOf(i), Long.valueOf(j));
            this.streamWriterSizes.put(Integer.valueOf(i), Long.valueOf(j));
            checkSendNotification();
        }

        private void checkSendNotification() {
            long j = this.streamInitSize;
            Iterator<Long> it = this.streamWriterSizes.values().iterator();
            while (it.hasNext()) {
                j += it.next().longValue();
            }
            DistributedStreamService.LOG.trace("Check notification publishing: sum is {}, baseCount is {}", Long.valueOf(j), this.streamBaseCount);
            if (j - this.streamBaseCount.get() > toBytes(this.streamThresholdMB.get())) {
                try {
                    publishNotification(j);
                    this.streamBaseCount.set(j);
                } catch (Throwable th) {
                    this.streamBaseCount.set(j);
                    throw th;
                }
            }
        }

        private long toBytes(int i) {
            return i * 1024 * 1024;
        }

        private void publishNotification(long j) {
            try {
                DistributedStreamService.this.notificationService.publish(this.streamFeed, new StreamSizeNotification(System.currentTimeMillis(), j)).get();
            } catch (NotificationFeedException e) {
                DistributedStreamService.LOG.warn("Error with notification feed {}", this.streamFeed, e);
            } catch (Throwable th) {
                DistributedStreamService.LOG.warn("Could not publish notification on feed {}", this.streamFeed.getFeedId(), th);
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/data/stream/service/DistributedStreamService$StreamsLeaderHandler.class */
    private final class StreamsLeaderHandler extends ResourceHandler {
        protected StreamsLeaderHandler() {
            super((Discoverable) DistributedStreamService.this.discoverableSupplier.get());
        }

        public void onChange(Collection<PartitionReplica> collection) {
            DistributedStreamService.LOG.info("Stream leader requirement has changed to {}", collection);
            DistributedStreamService.this.invokeLeaderListeners(ImmutableSet.copyOf(ImmutableSet.copyOf(Iterables.transform(collection, new Function<PartitionReplica, Id.Stream>() { // from class: co.cask.cdap.data.stream.service.DistributedStreamService.StreamsLeaderHandler.1
                @Nullable
                public Id.Stream apply(@Nullable PartitionReplica partitionReplica) {
                    if (partitionReplica != null) {
                        return Id.Stream.fromString(partitionReplica.getName(), Id.Stream.class);
                    }
                    return null;
                }
            }))));
        }

        public void finished(Throwable th) {
            if (th != null) {
                DistributedStreamService.LOG.error("Finished with failure for Stream handler instance {}", ((Discoverable) DistributedStreamService.this.discoverableSupplier.get()).getName(), th);
            }
        }
    }

    @Inject
    public DistributedStreamService(CConfiguration cConfiguration, StreamAdmin streamAdmin, StreamCoordinatorClient streamCoordinatorClient, StreamFileJanitorService streamFileJanitorService, ZKClient zKClient, DiscoveryServiceClient discoveryServiceClient, StreamMetaStore streamMetaStore, Supplier<Discoverable> supplier, StreamWriterSizeCollector streamWriterSizeCollector, HeartbeatPublisher heartbeatPublisher, NotificationFeedManager notificationFeedManager, NotificationService notificationService, MetricStore metricStore) {
        super(streamCoordinatorClient, streamFileJanitorService, streamWriterSizeCollector, metricStore);
        this.zkClient = zKClient;
        this.streamAdmin = streamAdmin;
        this.notificationService = notificationService;
        this.discoveryServiceClient = discoveryServiceClient;
        this.streamMetaStore = streamMetaStore;
        this.discoverableSupplier = supplier;
        this.feedManager = notificationFeedManager;
        this.streamWriterSizeCollector = streamWriterSizeCollector;
        this.heartbeatPublisher = heartbeatPublisher;
        this.resourceCoordinatorClient = new ResourceCoordinatorClient(getCoordinatorZKClient());
        this.leaderListeners = Sets.newHashSet();
        this.instanceId = cConfiguration.getInt("stream.container.instance.id");
        this.aggregators = Maps.newConcurrentMap();
    }

    @Override // co.cask.cdap.data.stream.service.AbstractStreamService
    protected void initialize() throws Exception {
        LOG.info("Initializing DistributedStreamService.");
        createHeartbeatsFeed();
        this.heartbeatPublisher.startAndWait();
        this.resourceCoordinatorClient.startAndWait();
        this.coordinationSubscription = this.resourceCoordinatorClient.subscribe(((Discoverable) this.discoverableSupplier.get()).getName(), new StreamsLeaderHandler());
        this.heartbeatsSubscriptionExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("heartbeats-subscription-executor"));
        this.heartbeatsSubscription = subscribeToHeartbeatsFeed();
        this.leaderListenerCancellable = addLeaderListener(new StreamLeaderListener() { // from class: co.cask.cdap.data.stream.service.DistributedStreamService.1
            @Override // co.cask.cdap.data.stream.StreamLeaderListener
            public void leaderOf(Set<Id.Stream> set) {
                DistributedStreamService.this.aggregate(set);
            }
        });
        performLeaderElection();
        LOG.info("DistributedStreamService initialized.");
    }

    @Override // co.cask.cdap.data.stream.service.AbstractStreamService
    protected void doShutdown() throws Exception {
        Iterator<StreamSizeAggregator> it = this.aggregators.values().iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
        if (this.leaderListenerCancellable != null) {
            this.leaderListenerCancellable.cancel();
        }
        if (this.heartbeatsSubscription != null) {
            this.heartbeatsSubscription.cancel();
        }
        if (this.heartbeatsSubscriptionExecutor != null) {
            this.heartbeatsSubscriptionExecutor.shutdownNow();
        }
        this.heartbeatPublisher.stopAndWait();
        if (this.leaderElection != null) {
            Uninterruptibles.getUninterruptibly(this.leaderElection.stop(), 5L, TimeUnit.SECONDS);
        }
        if (this.coordinationSubscription != null) {
            this.coordinationSubscription.cancel();
        }
        if (this.resourceCoordinatorClient != null) {
            this.resourceCoordinatorClient.stopAndWait();
        }
    }

    protected void runOneIteration() throws Exception {
        LOG.trace("Performing heartbeat publishing in Stream service instance {}", Integer.valueOf(this.instanceId));
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (Map.Entry<Id.Stream, AtomicLong> entry : this.streamWriterSizeCollector.getStreamSizes().entrySet()) {
            builder.put(entry.getKey(), Long.valueOf(entry.getValue().get()));
        }
        StreamWriterHeartbeat streamWriterHeartbeat = new StreamWriterHeartbeat(System.currentTimeMillis(), this.instanceId, builder.build());
        LOG.trace("Publishing heartbeat {}", streamWriterHeartbeat);
        this.heartbeatPublisher.sendHeartbeat(streamWriterHeartbeat);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void aggregate(Set<Id.Stream> set) {
        HashSet<Id.Stream> newHashSet = Sets.newHashSet(this.aggregators.keySet());
        for (Id.Stream stream : set) {
            if (newHashSet.remove(stream)) {
            }
            while (true) {
                try {
                    break;
                } catch (Exception e) {
                    LOG.info("Could not compute sizes of files for stream {}. Retrying in 1 sec.", stream);
                    try {
                        TimeUnit.SECONDS.sleep(1L);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        throw Throwables.propagate(e2);
                    }
                }
            }
            if (this.streamAdmin.exists(stream)) {
                int notificationThresholdMB = this.streamAdmin.getConfig(stream).getNotificationThresholdMB();
                long streamEventsSize = getStreamEventsSize(stream);
                createSizeAggregator(stream, streamEventsSize, notificationThresholdMB);
                LOG.debug("Size of the events ingested in stream {}: {}", stream, Long.valueOf(streamEventsSize));
            }
        }
        for (Id.Stream stream2 : newHashSet) {
            StreamSizeAggregator streamSizeAggregator = this.aggregators.get(stream2);
            if (streamSizeAggregator != null) {
                streamSizeAggregator.cancel();
            }
            this.aggregators.remove(stream2);
        }
    }

    private StreamSizeAggregator createSizeAggregator(Id.Stream stream, long j, int i) {
        LOG.debug("Creating size aggregator for stream {} with baseCount {} and threshold {}", new Object[]{stream, Long.valueOf(j), Integer.valueOf(i)});
        StreamSizeAggregator streamSizeAggregator = new StreamSizeAggregator(stream, j, i, getStreamCoordinatorClient().addListener(stream, new StreamPropertyListener() { // from class: co.cask.cdap.data.stream.service.DistributedStreamService.2
            @Override // co.cask.cdap.data.stream.StreamPropertyListener
            public void thresholdChanged(Id.Stream stream2, int i2) {
                Object obj = DistributedStreamService.this.aggregators.get(stream2);
                while (true) {
                    StreamSizeAggregator streamSizeAggregator2 = (StreamSizeAggregator) obj;
                    if (streamSizeAggregator2 != null) {
                        streamSizeAggregator2.setStreamThresholdMB(i2);
                        return;
                    } else {
                        Thread.yield();
                        obj = DistributedStreamService.this.aggregators.get(stream2);
                    }
                }
            }
        }));
        streamSizeAggregator.init();
        this.aggregators.put(stream, streamSizeAggregator);
        return streamSizeAggregator;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ZKClient getCoordinatorZKClient() {
        return ZKClients.namespace(this.zkClient, Constants.Stream.STREAM_ZK_COORDINATION_NAMESPACE);
    }

    private Cancellable subscribeToHeartbeatsFeed() throws NotificationFeedNotFoundException {
        LOG.debug("Subscribing to stream heartbeats notification feed");
        Id.NotificationFeed build = new Id.NotificationFeed.Builder().setNamespaceId(Id.Namespace.SYSTEM.getId()).setCategory("streamInternal").setName("heartbeat").build();
        boolean z = false;
        while (true) {
            try {
                return this.notificationService.subscribe(build, new NotificationHandler<StreamWriterHeartbeat>() { // from class: co.cask.cdap.data.stream.service.DistributedStreamService.3
                    public Type getNotificationType() {
                        return StreamWriterHeartbeat.class;
                    }

                    public void received(StreamWriterHeartbeat streamWriterHeartbeat, NotificationContext notificationContext) {
                        DistributedStreamService.LOG.trace("Received heartbeat {}", streamWriterHeartbeat);
                        for (Map.Entry<Id.Stream, Long> entry : streamWriterHeartbeat.getStreamsSizes().entrySet()) {
                            StreamSizeAggregator streamSizeAggregator = (StreamSizeAggregator) DistributedStreamService.this.aggregators.get(entry.getKey());
                            if (streamSizeAggregator == null) {
                                DistributedStreamService.LOG.trace("Aggregator for stream {} is null", entry.getKey());
                            } else {
                                streamSizeAggregator.bytesReceived(streamWriterHeartbeat.getInstanceId(), entry.getValue().longValue());
                            }
                        }
                    }
                }, this.heartbeatsSubscriptionExecutor);
            } catch (NotificationFeedException e) {
                if (z) {
                    LOG.debug("Unable to subscribe to HeartbeatsFeed. Will retry until successfully subscribed. ", e);
                } else {
                    LOG.warn("Unable to subscribe to HeartbeatsFeed. Will retry until successfully subscribed. Retry failures will be logged at debug level.", e);
                }
                z = true;
                waitBeforeRetryHeartbeatsFeedOperation();
            }
        }
    }

    private Cancellable addLeaderListener(final StreamLeaderListener streamLeaderListener) {
        synchronized (this) {
            this.leaderListeners.add(streamLeaderListener);
        }
        return new Cancellable() { // from class: co.cask.cdap.data.stream.service.DistributedStreamService.4
            public void cancel() {
                synchronized (DistributedStreamService.this) {
                    DistributedStreamService.this.leaderListeners.remove(streamLeaderListener);
                }
            }
        };
    }

    private void createHeartbeatsFeed() throws NotificationFeedException {
        Id.NotificationFeed build = new Id.NotificationFeed.Builder().setNamespaceId(Id.Namespace.SYSTEM.getId()).setCategory("streamInternal").setName("heartbeat").setDescription("Stream heartbeats feed.").build();
        LOG.debug("Ensuring Stream HeartbeatsFeed exists.");
        boolean z = false;
        while (true) {
            try {
                this.feedManager.getFeed(build);
                LOG.debug("Stream HeartbeatsFeed exists.");
                return;
            } catch (NotificationFeedException e) {
                if (z) {
                    LOG.debug("Could not ensure existence of HeartbeatsFeed. Will retry until successful.", e);
                } else {
                    LOG.warn("Could not ensure existence of HeartbeatsFeed. Will retry until successful. Retry failures will be logged at debug level.", e);
                }
                z = true;
                waitBeforeRetryHeartbeatsFeedOperation();
            } catch (NotificationFeedNotFoundException e2) {
                if (!z) {
                    LOG.debug("Creating Stream HeartbeatsFeed.");
                }
                this.feedManager.createFeed(build);
                LOG.info("Stream HeartbeatsFeed created.");
                return;
            }
        }
    }

    private void waitBeforeRetryHeartbeatsFeedOperation() {
        try {
            TimeUnit.SECONDS.sleep(1L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw Throwables.propagate(e);
        }
    }

    private void performLeaderElection() {
        this.leaderElection = new LeaderElection(this.zkClient, "/election/streams.coordinator", new ElectionHandler() { // from class: co.cask.cdap.data.stream.service.DistributedStreamService.5
            public void leader() {
                DistributedStreamService.LOG.info("Became Stream handler leader. Starting resource coordinator.");
                DistributedStreamService.this.resourceCoordinator = new ResourceCoordinator(DistributedStreamService.this.getCoordinatorZKClient(), DistributedStreamService.this.discoveryServiceClient, new BalancedAssignmentStrategy());
                DistributedStreamService.this.resourceCoordinator.startAndWait();
                DistributedStreamService.this.updateRequirement();
            }

            public void follower() {
                DistributedStreamService.LOG.info("Became Stream handler follower.");
                if (DistributedStreamService.this.resourceCoordinator != null) {
                    DistributedStreamService.this.resourceCoordinator.stopAndWait();
                }
            }
        });
        this.leaderElection.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateRequirement() {
        final ResourceModifier createRequirementModifier = createRequirementModifier();
        Futures.addCallback(this.resourceCoordinatorClient.modifyRequirement("streams", createRequirementModifier), new FutureCallback<ResourceRequirement>() { // from class: co.cask.cdap.data.stream.service.DistributedStreamService.6
            public void onSuccess(ResourceRequirement resourceRequirement) {
                DistributedStreamService.LOG.info("Stream resource requirement updated to {}", resourceRequirement);
            }

            public void onFailure(Throwable th) {
                DistributedStreamService.LOG.warn("Failed to update stream resource requirement: {}", th.getMessage());
                DistributedStreamService.LOG.debug("Failed to update stream resource requirement.", th);
                if (DistributedStreamService.this.isRunning()) {
                    Thread thread = new Thread("stream-resource-update") { // from class: co.cask.cdap.data.stream.service.DistributedStreamService.6.1
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            try {
                                TimeUnit.SECONDS.sleep(2L);
                                DistributedStreamService.LOG.info("Retrying update stream resource requirement");
                                Futures.addCallback(DistributedStreamService.this.resourceCoordinatorClient.modifyRequirement("streams", createRequirementModifier), this);
                            } catch (InterruptedException e) {
                                DistributedStreamService.LOG.warn("Stream resource retry thread interrupted", e);
                            }
                        }
                    };
                    thread.setDaemon(true);
                    thread.start();
                }
            }
        });
    }

    private ResourceModifier createRequirementModifier() {
        return new ResourceModifier() { // from class: co.cask.cdap.data.stream.service.DistributedStreamService.7
            @Nullable
            public ResourceRequirement apply(@Nullable ResourceRequirement resourceRequirement) {
                try {
                    ResourceRequirement.Builder builder = ResourceRequirement.builder("streams");
                    for (Map.Entry entry : DistributedStreamService.this.streamMetaStore.listStreams().entries()) {
                        Id.Stream from = Id.Stream.from((Id.Namespace) entry.getKey(), ((StreamSpecification) entry.getValue()).getName());
                        DistributedStreamService.LOG.debug("Adding {} stream as a resource to the coordinator to manager streams leaders.", from);
                        builder.addPartition(new ResourceRequirement.Partition(from.toString(), 1));
                    }
                    return builder.build();
                } catch (Throwable th) {
                    DistributedStreamService.LOG.warn("Could not create requirement for coordinator in Stream handler leader: " + th.getMessage());
                    DistributedStreamService.LOG.debug("Could not create requirement for coordinator in Stream handler leader", th);
                    throw Throwables.propagate(th);
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void invokeLeaderListeners(Set<Id.Stream> set) {
        ImmutableSet copyOf;
        LOG.debug("Stream writer is the leader of streams: {}", set);
        synchronized (this) {
            copyOf = ImmutableSet.copyOf(this.leaderListeners);
        }
        Iterator it = copyOf.iterator();
        while (it.hasNext()) {
            ((StreamLeaderListener) it.next()).leaderOf(set);
        }
    }
}
