package org.aanguita.jacuzzi.event.hub;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.aanguita.jacuzzi.concurrency.ThreadExecutor;
import org.aanguita.jacuzzi.event.hub.EventHubFactory;
import org.aanguita.jacuzzi.event.hub.SubscriberData;
import org.aanguita.jacuzzi.objects.ObjectMapPool;
import org.aanguita.jacuzzi.queues.ConsumerQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/aanguita/jacuzzi/event/hub/AbstractEventHub.class */
public abstract class AbstractEventHub implements EventHub {
    private static final Logger logger = LoggerFactory.getLogger(AbstractEventHub.class);
    private final String name;
    private final String threadExecutorClientId;
    private final Map<String, SubscriberData> subscribers = new HashMap();
    private final Cache channelCache = new Cache();
    private final PublicationRepository publicationRepository = new PublicationRepository();
    private final ConsumerQueue<PublicationRequest> queuedPublications = new ConsumerQueue<>(publicationRequest -> {
        publish(publicationRequest.keepMillis, publicationRequest.parsedChannel, publicationRequest.timestamp, publicationRequest.messages);
    });
    private final ObjectMapPool<String, ConsumerQueue<PublicationRequest>> channelQueuedPublications = new ObjectMapPool<>(str -> {
        return new ConsumerQueue(publicationRequest -> {
            publish(publicationRequest.keepMillis, publicationRequest.parsedChannel, publicationRequest.timestamp, publicationRequest.messages);
        });
    });
    private final AtomicBoolean alive = new AtomicBoolean(true);
    private boolean running = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/aanguita/jacuzzi/event/hub/AbstractEventHub$PublicationRequest.class */
    public static class PublicationRequest {
        private final Long keepMillis;
        private final Channel parsedChannel;
        private final long timestamp;
        private final Object[] messages;

        private PublicationRequest(Long l, Channel channel, long j, Object[] objArr) {
            this.keepMillis = l;
            this.parsedChannel = channel;
            this.timestamp = j;
            this.messages = objArr;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractEventHub(String str) {
        this.name = str;
        this.threadExecutorClientId = ThreadExecutor.registerClient(str + ".EventHub");
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public String getName() {
        return this.name;
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized void start() {
        resume();
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized void pause() {
        this.running = false;
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized void resume() {
        this.running = true;
        this.queuedPublications.flush(false);
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized void pause(String str) {
        this.channelQueuedPublications.getObject(str);
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized void resume(String str) {
        if (this.channelQueuedPublications.containsKey(str)) {
            this.channelQueuedPublications.removeObject(str).flush(false);
        }
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized void publish(String str, Object... objArr) {
        publish(0L, str, objArr);
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized void publish(Long l, String str, Object... objArr) {
        if (this.alive.get()) {
            long currentTimeMillis = System.currentTimeMillis();
            Channel channel = new Channel(str);
            if (!this.running) {
                this.queuedPublications.add(new PublicationRequest(l, channel, currentTimeMillis, objArr));
            } else if (this.channelQueuedPublications.containsKey(str)) {
                this.channelQueuedPublications.getObject(str).add(new PublicationRequest(l, channel, currentTimeMillis, objArr));
            } else {
                publish(l, channel, currentTimeMillis, objArr);
            }
        }
    }

    private void publish(Long l, Channel channel, long j, Object... objArr) {
        Publication publication = new Publication(getName(), channel, j, objArr);
        List<MatchingSubscriber> findSubscribers = findSubscribers(channel);
        logger.trace("Found matching subscribers for publication: {}", findSubscribers);
        publish(findSubscribers, publication);
        this.publicationRepository.storePublication(publication, l);
    }

    private synchronized List<MatchingSubscriber> findSubscribers(Channel channel) {
        if (this.channelCache.containsChannel(channel)) {
            return this.channelCache.getSubscribersForExpression(channel);
        }
        ArrayList arrayList = new ArrayList();
        for (SubscriberData subscriberData : this.subscribers.values()) {
            Integer highestPriorityMatch = highestPriorityMatch(subscriberData.getChannels(), channel);
            if (highestPriorityMatch != null) {
                arrayList.add(new MatchingSubscriber(highestPriorityMatch.intValue(), subscriberData.getSubscriberProcessor()));
            }
        }
        this.channelCache.addChannel(channel, arrayList);
        return this.channelCache.getSubscribersForExpression(channel);
    }

    private static Integer highestPriorityMatch(Set<SubscriberData.ChannelWithPriority> set, Channel channel) {
        Optional<SubscriberData.ChannelWithPriority> max = set.stream().filter(channelWithPriority -> {
            return channelWithPriority.getChannel().matches(channel);
        }).max((channelWithPriority2, channelWithPriority3) -> {
            return channelWithPriority3.getPriority() - channelWithPriority2.getPriority();
        });
        if (max.isPresent()) {
            return Integer.valueOf(max.get().getPriority());
        }
        return null;
    }

    protected abstract void publish(List<MatchingSubscriber> list, Publication publication);

    /* JADX INFO: Access modifiers changed from: protected */
    public void invokeSubscribers(List<MatchingSubscriber> list, Publication publication) {
        Iterator<MatchingSubscriber> it = list.iterator();
        while (it.hasNext()) {
            it.next().publish(publication);
        }
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized Collection<String> getSubscribers() {
        return new HashSet(this.subscribers.keySet());
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized void registerSubscriber(EventHubSubscriber eventHubSubscriber, EventHubFactory.Type type) {
        if (this.alive.get()) {
            if (this.subscribers.containsKey(getSubscriberId(eventHubSubscriber))) {
                throw new IllegalArgumentException("Subscriber id already registered: " + getSubscriberId(eventHubSubscriber));
            }
            this.subscribers.put(getSubscriberId(eventHubSubscriber), new SubscriberData(getSubscriberId(eventHubSubscriber), eventHubSubscriber, SubscriberProcessorFactory.createSubscriberProcessor(type, getSubscriberId(eventHubSubscriber), eventHubSubscriber)));
        }
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized void subscribe(EventHubSubscriber eventHubSubscriber, String... strArr) {
        if (this.alive.get()) {
            subscribe(eventHubSubscriber, 0, strArr);
        }
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized void subscribe(EventHubSubscriber eventHubSubscriber, int i, String... strArr) {
        if (this.alive.get()) {
            if (!this.subscribers.containsKey(getSubscriberId(eventHubSubscriber))) {
                throw new IllegalArgumentException("Attempting to subscribe an unregistered subscriber: " + getSubscriberId(eventHubSubscriber));
            }
            this.subscribers.get(getSubscriberId(eventHubSubscriber)).subscribe(i, strArr);
            this.channelCache.invalidate();
            ThreadExecutor.submit(() -> {
                this.publicationRepository.getStoredPublications(strArr).forEach(publication -> {
                    this.subscribers.get(getSubscriberId(eventHubSubscriber)).getSubscriber().event(publication);
                });
            });
        }
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized void unsubscribe(EventHubSubscriber eventHubSubscriber, String str, String... strArr) {
        if (this.alive.get()) {
            if (this.subscribers.containsKey(getSubscriberId(eventHubSubscriber))) {
                this.subscribers.get(getSubscriberId(eventHubSubscriber)).unsubscribe(str);
                this.subscribers.get(getSubscriberId(eventHubSubscriber)).unsubscribe(strArr);
            }
            this.channelCache.invalidate();
        }
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized void unsubscribeAll(EventHubSubscriber eventHubSubscriber) {
        if (this.alive.get()) {
            if (this.subscribers.containsKey(getSubscriberId(eventHubSubscriber))) {
                this.subscribers.get(getSubscriberId(eventHubSubscriber)).unsubscribeAll();
            }
            this.channelCache.invalidate();
        }
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized void unregisterSubscriber(EventHubSubscriber eventHubSubscriber) {
        if (this.alive.get()) {
            if (this.subscribers.containsKey(getSubscriberId(eventHubSubscriber))) {
                this.subscribers.remove(getSubscriberId(eventHubSubscriber));
            }
            this.channelCache.invalidate();
        }
    }

    private String getSubscriberId(EventHubSubscriber eventHubSubscriber) {
        return eventHubSubscriber.getId();
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized int getSubscribersCount() {
        return this.subscribers.size();
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized int getSubscribersCount(String str) {
        return findSubscribers(new Channel(str)).size();
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized boolean hasSubscribers() {
        return getSubscribersCount() > 0;
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public synchronized List<Publication> getStoredPublications(String... strArr) {
        return this.alive.get() ? this.publicationRepository.getStoredPublications(strArr) : new ArrayList();
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public Set<String> cachedChannels() {
        return this.channelCache.cachedChannels();
    }

    @Override // org.aanguita.jacuzzi.event.hub.EventHub
    public void close() {
        if (this.alive.getAndSet(false)) {
            this.channelCache.invalidate();
            this.publicationRepository.clear();
            this.subscribers.values().forEach((v0) -> {
                v0.close();
            });
            this.subscribers.clear();
            ThreadExecutor.unregisterClient(this.threadExecutorClientId);
            EventHubFactory.removeEventHub(getName());
        }
    }
}
