package io.mantisrx.publish;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import io.mantisrx.publish.api.Event;
import io.mantisrx.publish.api.StreamType;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.core.Subscription;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.internal.metrics.StreamMetrics;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:WEB-INF/lib/mantis-publish-netty-1.3.5.jar:io/mantisrx/publish/StreamManager.class
 */
/* loaded from: input_file:WEB-INF/lib/mantis-publish-core-1.3.5.jar:io/mantisrx/publish/StreamManager.class */
public class StreamManager {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StreamManager.class);
    private final Registry registry;
    private final MrePublishConfiguration config;
    private final Counter mantisStreamCreateFailed;
    private final ConcurrentMap<String, ConcurrentSkipListSet<Subscription>> streamSubscriptionsMap = new ConcurrentHashMap();
    private final ConcurrentMap<String, List<String>> subscriptionIdToStreamsMap = new ConcurrentHashMap();
    private final ConcurrentMap<String, BlockingQueue<Event>> streamQueuesMap = new ConcurrentHashMap();
    private final ConcurrentMap<String, StreamMetrics> streamMetricsMap = new ConcurrentHashMap();

    public StreamManager(Registry registry, MrePublishConfiguration mrePublishConfiguration) {
        this.registry = registry;
        this.config = mrePublishConfiguration;
        this.mantisStreamCreateFailed = SpectatorUtils.buildAndRegisterCounter(this.registry, "mantisStreamCreateFailed");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized Optional<BlockingQueue<Event>> registerStream(String str) {
        if (!this.streamQueuesMap.containsKey(str)) {
            cleanupInactiveStreamQueues();
            if (this.streamQueuesMap.keySet().size() >= this.config.maxNumStreams()) {
                LOG.debug("failed to create queue for stream {} (MAX_NUM_STREAMS {} exceeded)", str, Integer.valueOf(this.config.maxNumStreams()));
                this.mantisStreamCreateFailed.increment();
                return Optional.empty();
            }
            int streamQueueSize = this.config.streamQueueSize(str);
            LOG.info("creating queue for stream {} (size: {})", str, Integer.valueOf(streamQueueSize));
            this.streamQueuesMap.putIfAbsent(str, new LinkedBlockingQueue(streamQueueSize));
            this.streamMetricsMap.putIfAbsent(str, new StreamMetrics(this.registry, str));
        }
        return Optional.ofNullable(this.streamQueuesMap.get(str));
    }

    private boolean isStreamInactive(long j) {
        return TimeUnit.SECONDS.convert(System.nanoTime() - j, TimeUnit.NANOSECONDS) > this.config.streamInactiveDurationThreshold();
    }

    private void cleanupInactiveStreamQueues() {
        ArrayList arrayList = new ArrayList(5);
        this.streamQueuesMap.keySet().stream().forEach(str -> {
            getStreamMetrics(str).ifPresent(streamMetrics -> {
                long lastEventOnStreamTimestamp = streamMetrics.getLastEventOnStreamTimestamp();
                if (lastEventOnStreamTimestamp == 0 || !isStreamInactive(lastEventOnStreamTimestamp)) {
                    return;
                }
                arrayList.add(str);
            });
        });
        arrayList.stream().forEach(str2 -> {
            this.streamQueuesMap.remove(str2);
            this.streamMetricsMap.remove(str2);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<BlockingQueue<Event>> getQueueForStream(String str) {
        return Optional.ofNullable(this.streamQueuesMap.get(str));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<StreamMetrics> getStreamMetrics(String str) {
        return Optional.ofNullable(this.streamMetricsMap.get(str));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasSubscriptions(String str) {
        return ((Boolean) Optional.ofNullable(this.streamSubscriptionsMap.get(str)).map(concurrentSkipListSet -> {
            return Boolean.valueOf(!concurrentSkipListSet.isEmpty());
        }).orElse(false)).booleanValue();
    }

    private List<String> sanitizeStreamSubjects(List<String> list) {
        return (List) list.stream().map(str -> {
            return (str.toLowerCase().equals("observable") || str.toLowerCase().equals("stream")) ? StreamType.DEFAULT_EVENT_STREAM : str;
        }).collect(Collectors.toList());
    }

    private void handleDuplicateSubscriptionId(Subscription subscription) {
        String subscriptionId = subscription.getSubscriptionId();
        Optional.ofNullable(this.subscriptionIdToStreamsMap.get(subscriptionId)).ifPresent(list -> {
            removeSubscriptionId(list, subscriptionId);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addStreamSubscription(Subscription subscription) {
        List<String> sanitizeStreamSubjects = sanitizeStreamSubjects(subscription.getSubjects());
        LOG.info("adding subscription {} with streams {}", subscription, sanitizeStreamSubjects);
        handleDuplicateSubscriptionId(subscription);
        for (String str : sanitizeStreamSubjects) {
            this.streamSubscriptionsMap.putIfAbsent(str, new ConcurrentSkipListSet<>());
            ConcurrentSkipListSet<Subscription> concurrentSkipListSet = this.streamSubscriptionsMap.get(str);
            int maxSubscriptions = this.config.maxSubscriptions(str);
            concurrentSkipListSet.removeIf(subscription2 -> {
                return subscription2.getSubscriptionId().equals(subscription.getSubscriptionId());
            });
            concurrentSkipListSet.add(subscription);
            int size = concurrentSkipListSet.size();
            if (size > maxSubscriptions) {
                removeStreamSubscription(subscription);
                LOG.warn("QUERY SUBSCRIPTION REJECTED: Number of subscriptions for stream {} exceeded max {}. Increase (default mre.publish.max.subscriptions.per.stream.default or  mre.publish.max.subscriptions.stream.<streamName>) to allow more queries. Removed {}", str, Integer.valueOf(maxSubscriptions), subscription);
                getStreamMetrics(str).ifPresent(streamMetrics -> {
                    streamMetrics.getMantisQueryRejectedCounter().increment();
                });
            }
            getStreamMetrics(str).ifPresent(streamMetrics2 -> {
                streamMetrics2.getMantisActiveQueryCountGauge().set(size);
            });
        }
        this.subscriptionIdToStreamsMap.put(subscription.getSubscriptionId(), sanitizeStreamSubjects);
    }

    private void removeSubscriptionId(List<String> list, String str) {
        ConcurrentSkipListSet<Subscription> concurrentSkipListSet;
        for (String str2 : list) {
            if (this.streamSubscriptionsMap.containsKey(str2) && (concurrentSkipListSet = this.streamSubscriptionsMap.get(str2)) != null) {
                concurrentSkipListSet.removeIf(subscription -> {
                    return subscription.getSubscriptionId().equals(str);
                });
                getStreamMetrics(str2).ifPresent(streamMetrics -> {
                    streamMetrics.getMantisActiveQueryCountGauge().set(concurrentSkipListSet.size());
                });
                if (concurrentSkipListSet.isEmpty()) {
                    this.streamSubscriptionsMap.remove(str2);
                }
            }
        }
    }

    synchronized boolean removeStreamSubscription(String str) {
        LOG.info("removing subscription {}", str);
        removeSubscriptionId(this.subscriptionIdToStreamsMap.getOrDefault(str, Collections.emptyList()), str);
        this.subscriptionIdToStreamsMap.remove(str);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean removeStreamSubscription(Subscription subscription) {
        LOG.info("removing subscription {}", subscription);
        removeSubscriptionId(sanitizeStreamSubjects(subscription.getSubjects()), subscription.getSubscriptionId());
        this.subscriptionIdToStreamsMap.remove(subscription.getSubscriptionId());
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<Subscription> getStreamSubscriptions(String str) {
        return this.streamSubscriptionsMap.getOrDefault(str, new ConcurrentSkipListSet<>());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<String> getRegisteredStreams() {
        return this.streamQueuesMap.keySet();
    }
}
