package io.mantisrx.publish;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.core.Subscription;
import io.mantisrx.publish.core.SubscriptionFactory;
import io.mantisrx.publish.internal.discovery.MantisJobDiscovery;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.proto.MantisServerSubscription;
import io.mantisrx.publish.proto.MantisServerSubscriptionEnvelope;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:WEB-INF/lib/mantis-publish-netty-1.3.50.jar:io/mantisrx/publish/AbstractSubscriptionTracker.class
 */
/* loaded from: input_file:WEB-INF/lib/mantis-publish-core-1.3.50.jar:io/mantisrx/publish/AbstractSubscriptionTracker.class */
public abstract class AbstractSubscriptionTracker implements SubscriptionTracker {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractSubscriptionTracker.class);
    private final MrePublishConfiguration mrePublishConfiguration;
    private final Registry registry;
    private final MantisJobDiscovery jobDiscovery;
    private final StreamManager streamManager;
    private final Counter refreshSubscriptionInvokedCount;
    private final Counter refreshSubscriptionSuccessCount;
    private final Counter refreshSubscriptionFailedCount;
    private final Counter staleSubscriptionRemovedCount;
    private ConcurrentHashMap<String, SubscriptionCacheEntry> subsciptionCache = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:WEB-INF/lib/mantis-publish-netty-1.3.50.jar:io/mantisrx/publish/AbstractSubscriptionTracker$SubscriptionCacheEntry.class
     */
    /* loaded from: input_file:WEB-INF/lib/mantis-publish-core-1.3.50.jar:io/mantisrx/publish/AbstractSubscriptionTracker$SubscriptionCacheEntry.class */
    public class SubscriptionCacheEntry {
        public final long timestamp;
        public final String sourceJob;
        public final MantisServerSubscription sub;

        public SubscriptionCacheEntry(long j, String str, MantisServerSubscription mantisServerSubscription) {
            this.timestamp = j;
            this.sourceJob = str;
            this.sub = mantisServerSubscription;
        }
    }

    public AbstractSubscriptionTracker(MrePublishConfiguration mrePublishConfiguration, Registry registry, MantisJobDiscovery mantisJobDiscovery, StreamManager streamManager) {
        this.mrePublishConfiguration = mrePublishConfiguration;
        this.registry = registry;
        this.jobDiscovery = mantisJobDiscovery;
        this.streamManager = streamManager;
        this.refreshSubscriptionInvokedCount = SpectatorUtils.buildAndRegisterCounter(registry, "refreshSubscriptionInvokedCount");
        this.refreshSubscriptionSuccessCount = SpectatorUtils.buildAndRegisterCounter(registry, "refreshSubscriptionSuccessCount");
        this.refreshSubscriptionFailedCount = SpectatorUtils.buildAndRegisterCounter(registry, "refreshSubscriptionFailedCount");
        this.staleSubscriptionRemovedCount = SpectatorUtils.buildAndRegisterCounter(registry, "staleSubscriptionRemovedCount");
    }

    void propagateSubscriptionChanges(Set<MantisServerSubscription> set, Set<MantisServerSubscription> set2) {
        Set<Subscription> currentSubscriptions = getCurrentSubscriptions();
        set.stream().filter(mantisServerSubscription -> {
            return !((Set) currentSubscriptions.stream().map(subscription -> {
                return subscription.getSubscriptionId();
            }).collect(Collectors.toSet())).contains(mantisServerSubscription.getSubscriptionId());
        }).forEach(mantisServerSubscription2 -> {
            try {
                Optional<Subscription> subscription = SubscriptionFactory.getSubscription(mantisServerSubscription2.getSubscriptionId(), mantisServerSubscription2.getQuery());
                if (subscription.isPresent()) {
                    this.streamManager.addStreamSubscription(subscription.get());
                } else {
                    LOG.info("will not add invalid subscription {}", mantisServerSubscription2);
                }
            } catch (Throwable th) {
                LOG.debug("failed to add subscription {}", mantisServerSubscription2, th);
            }
        });
        Set set3 = (Set) set.stream().map(mantisServerSubscription3 -> {
            return mantisServerSubscription3.getSubscriptionId();
        }).collect(Collectors.toSet());
        set3.addAll((Collection) set2.stream().map(mantisServerSubscription4 -> {
            return mantisServerSubscription4.getSubscriptionId();
        }).collect(Collectors.toSet()));
        currentSubscriptions.stream().filter(subscription -> {
            return !set3.contains(subscription.getSubscriptionId());
        }).forEach(subscription2 -> {
            try {
                this.streamManager.removeStreamSubscription(subscription2.getSubscriptionId());
            } catch (Throwable th) {
                LOG.debug("failed to remove subscription {}", subscription2.getSubscriptionId());
            }
        });
    }

    public abstract Optional<MantisServerSubscriptionEnvelope> fetchSubscriptions(String str);

    private Set<String> getRelevantJobClusters(Map<String, String> map, Set<String> set) {
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String key = entry.getKey();
            LOG.debug("processing stream {} and currently registered Streams {}", key, set);
            if (set.contains(key) || "__default__".equals(key)) {
                hashSet.add(entry.getValue());
            } else {
                LOG.debug("No server side mappings found for one or more streams {} ", set);
                LOG.debug("will not fetch subscriptions for un-registered stream {}", key);
            }
        }
        return hashSet;
    }

    @Override // io.mantisrx.publish.SubscriptionTracker
    public void refreshSubscriptions() {
        this.refreshSubscriptionInvokedCount.increment();
        boolean isMREClientEnabled = this.mrePublishConfiguration.isMREClientEnabled();
        Set<String> registeredStreams = this.streamManager.getRegisteredStreams();
        if (!isMREClientEnabled || registeredStreams.isEmpty()) {
            LOG.debug("subscription refresh skipped (client enabled {} registered streams {})", Boolean.valueOf(isMREClientEnabled), registeredStreams);
            return;
        }
        Set<String> relevantJobClusters = getRelevantJobClusters(this.jobDiscovery.getStreamNameToJobClusterMapping(this.mrePublishConfiguration.appName()), registeredStreams);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        long currentTimeMillis = System.currentTimeMillis();
        for (String str : relevantJobClusters) {
            try {
                Optional<MantisServerSubscriptionEnvelope> fetchSubscriptions = fetchSubscriptions(str);
                if (fetchSubscriptions.isPresent()) {
                    MantisServerSubscriptionEnvelope mantisServerSubscriptionEnvelope = fetchSubscriptions.get();
                    for (MantisServerSubscription mantisServerSubscription : mantisServerSubscriptionEnvelope.getSubscriptions()) {
                        this.subsciptionCache.put(mantisServerSubscription.getSubscriptionId(), new SubscriptionCacheEntry(currentTimeMillis, str, mantisServerSubscription));
                    }
                    hashSet.addAll(mantisServerSubscriptionEnvelope.getSubscriptions());
                    this.refreshSubscriptionSuccessCount.increment();
                } else {
                    hashSet2.add(str);
                    this.refreshSubscriptionFailedCount.increment();
                }
            } catch (Exception e) {
                hashSet2.add(str);
                LOG.info("refresh subscriptions failed for {}", str, e);
                this.refreshSubscriptionFailedCount.increment();
            }
        }
        propagateSubscriptionChanges(hashSet, (Set) this.subsciptionCache.entrySet().stream().filter(entry -> {
            return hashSet2.contains(((SubscriptionCacheEntry) entry.getValue()).sourceJob);
        }).filter(entry2 -> {
            return currentTimeMillis - ((SubscriptionCacheEntry) entry2.getValue()).timestamp < ((long) (this.mrePublishConfiguration.subscriptionExpiryIntervalSec() * 1000));
        }).map(entry3 -> {
            return ((SubscriptionCacheEntry) entry3.getValue()).sub;
        }).collect(Collectors.toSet()));
        this.subsciptionCache.entrySet().stream().filter(entry4 -> {
            return currentTimeMillis - ((SubscriptionCacheEntry) entry4.getValue()).timestamp > ((long) ((this.mrePublishConfiguration.subscriptionExpiryIntervalSec() * 1000) * 10));
        }).forEach(entry5 -> {
            this.subsciptionCache.remove(entry5.getKey());
        });
    }

    protected Set<Subscription> getCurrentSubscriptions() {
        return (Set) this.streamManager.getRegisteredStreams().stream().flatMap(str -> {
            return this.streamManager.getStreamSubscriptions(str).stream();
        }).collect(Collectors.toSet());
    }
}
