package de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.inputmanagement.subscription;

import com.google.protobuf.ByteString;
import de.unistuttgart.isw.sfsc.commonjava.protocol.pubsub.SubProtocol;
import de.unistuttgart.isw.sfsc.commonjava.util.Handle;
import de.unistuttgart.isw.sfsc.commonjava.util.LateComer;
import de.unistuttgart.isw.sfsc.commonjava.util.Listeners;
import de.unistuttgart.isw.sfsc.commonjava.util.NotThrowingAutoCloseable;
import de.unistuttgart.isw.sfsc.commonjava.util.OneShotRunnable;
import de.unistuttgart.isw.sfsc.commonjava.util.QueueConnector;
import de.unistuttgart.isw.sfsc.commonjava.util.ReplayingListener;
import de.unistuttgart.isw.sfsc.commonjava.util.StoreEvent;
import de.unistuttgart.isw.sfsc.commonjava.zmq.reactor.ReactiveSocket;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/unistuttgart/isw/sfsc/commonjava/zmq/pubsubsocketpair/inputmanagement/subscription/SubscriptionTrackingInbox.class */
public class SubscriptionTrackingInbox implements SubscriptionTracker, NotThrowingAutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(SubscriptionTrackingInbox.class);
    private final Listeners<Consumer<StoreEvent<ByteString>>> listeners = new Listeners<>();
    private final Set<ByteString> subscriptions = new HashSet();
    private final QueueConnector<List<byte[]>> queueConnector;

    SubscriptionTrackingInbox(ReactiveSocket.Inbox inbox) {
        Objects.requireNonNull(inbox);
        this.queueConnector = new QueueConnector<>(inbox::take);
    }

    public static SubscriptionTrackingInbox create(ReactiveSocket.Inbox inbox) {
        return new SubscriptionTrackingInbox(inbox);
    }

    public void start() {
        this.queueConnector.start(this::accept);
    }

    @Override // de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.inputmanagement.subscription.SubscriptionTracker
    public Set<ByteString> getSubscriptions() {
        return Collections.unmodifiableSet(this.subscriptions);
    }

    @Override // de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.inputmanagement.subscription.SubscriptionTracker
    public Handle addListener(Consumer<StoreEvent<ByteString>> consumer) {
        ReplayingListener replayingListener = new ReplayingListener(consumer);
        Handle add = this.listeners.add(replayingListener);
        replayingListener.prepend(getSubscriptions());
        replayingListener.start();
        return add;
    }

    @Override // de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.inputmanagement.subscription.SubscriptionTracker
    public Handle addOneShotListener(Predicate<StoreEvent<ByteString>> predicate, Runnable runnable) {
        LateComer lateComer = new LateComer();
        Consumer<StoreEvent<ByteString>> consumer = storeEvent -> {
            if (predicate.test(storeEvent)) {
                lateComer.run();
            }
        };
        Handle add = this.listeners.add(consumer);
        lateComer.set(new OneShotRunnable(() -> {
            runnable.run();
            add.close();
        }));
        StoreEvent.toStoreEventSet(getSubscriptions()).forEach(consumer);
        return add;
    }

    void accept(List<byte[]> list) {
        if (!SubProtocol.isValid(list)) {
            logger.warn("Received invalid subscription message");
            return;
        }
        SubProtocol.SubscriptionType subscriptionType = SubProtocol.getSubscriptionType(list);
        ByteString copyFrom = ByteString.copyFrom(SubProtocol.getTopic(list));
        switch (subscriptionType) {
            case SUBSCRIPTION:
                this.subscriptions.add(copyFrom);
                StoreEvent storeEvent = new StoreEvent(StoreEvent.StoreEventType.CREATE, copyFrom);
                logger.debug("Received subscription on topic {}", copyFrom.toStringUtf8());
                this.listeners.forEach(consumer -> {
                    consumer.accept(storeEvent);
                });
                return;
            case UNSUBSCRIPTION:
                this.subscriptions.remove(copyFrom);
                StoreEvent storeEvent2 = new StoreEvent(StoreEvent.StoreEventType.DELETE, copyFrom);
                logger.debug("Received unsubscription on topic {}", copyFrom.toStringUtf8());
                this.listeners.forEach(consumer2 -> {
                    consumer2.accept(storeEvent2);
                });
                return;
            default:
                logger.warn("Received unsupported message type {}", subscriptionType);
                return;
        }
    }

    @Override // de.unistuttgart.isw.sfsc.commonjava.util.NotThrowingAutoCloseable, java.lang.AutoCloseable
    public void close() {
        this.queueConnector.close();
    }
}
