package de.unistuttgart.isw.sfsc.commonjava.zmq.util;

import com.google.protobuf.ByteString;
import de.unistuttgart.isw.sfsc.commonjava.util.Handle;
import de.unistuttgart.isw.sfsc.commonjava.util.scheduling.Scheduler;
import de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.PubSubConnection;
import de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.inputmanagement.data.DataMultiplexer;
import de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.outputmanagement.SubscriptionManager;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:de/unistuttgart/isw/sfsc/commonjava/zmq/util/SubscriptionAgent.class */
public class SubscriptionAgent {
    private final DataMultiplexer dataMultiplexer;
    private final SubscriptionManager subscriptionManager;

    SubscriptionAgent(DataMultiplexer dataMultiplexer, SubscriptionManager subscriptionManager) {
        this.dataMultiplexer = dataMultiplexer;
        this.subscriptionManager = subscriptionManager;
    }

    public static SubscriptionAgent create(PubSubConnection pubSubConnection) {
        return new SubscriptionAgent(pubSubConnection.dataMultiplexer(), pubSubConnection.subscriptionManager());
    }

    public Handle addSubscriber(Set<ByteString> set, Predicate<ByteString> predicate, BiConsumer<ByteString, ByteString> biConsumer, Scheduler scheduler) {
        Set copyOf = Set.copyOf(set);
        Handle add = this.dataMultiplexer.add(predicate, (byteString, byteString2) -> {
            scheduler.execute(() -> {
                biConsumer.accept(byteString, byteString2);
            });
        });
        Stream stream = copyOf.stream();
        SubscriptionManager subscriptionManager = this.subscriptionManager;
        Objects.requireNonNull(subscriptionManager);
        Set set2 = (Set) stream.map(subscriptionManager::subscribe).collect(Collectors.toUnmodifiableSet());
        return () -> {
            set2.forEach((v0) -> {
                v0.close();
            });
            add.close();
        };
    }

    public Handle addSubscriber(ByteString byteString, Predicate<ByteString> predicate, BiConsumer<ByteString, ByteString> biConsumer, Scheduler scheduler) {
        return addSubscriber(Set.of(byteString), predicate, biConsumer, scheduler);
    }

    public Handle addSubscriber(ByteString byteString, BiConsumer<ByteString, ByteString> biConsumer, Scheduler scheduler) {
        Objects.requireNonNull(byteString);
        return addSubscriber(byteString, (v1) -> {
            return r2.equals(v1);
        }, biConsumer, scheduler);
    }
}
