package de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.inputmanagement.data;

import com.google.protobuf.ByteString;
import de.unistuttgart.isw.sfsc.commonjava.protocol.pubsub.DataProtocol;
import de.unistuttgart.isw.sfsc.commonjava.util.Handle;
import de.unistuttgart.isw.sfsc.commonjava.util.Listeners;
import de.unistuttgart.isw.sfsc.commonjava.util.NotThrowingAutoCloseable;
import de.unistuttgart.isw.sfsc.commonjava.util.QueueConnector;
import de.unistuttgart.isw.sfsc.commonjava.zmq.reactor.ReactiveSocket;
import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/unistuttgart/isw/sfsc/commonjava/zmq/pubsubsocketpair/inputmanagement/data/DataMultiplexingInbox.class */
public class DataMultiplexingInbox implements DataMultiplexer, NotThrowingAutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(DataMultiplexingInbox.class);
    private final Listeners<TopicListener> listeners = new Listeners<>();
    private final QueueConnector<List<byte[]>> queueConnector;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:de/unistuttgart/isw/sfsc/commonjava/zmq/pubsubsocketpair/inputmanagement/data/DataMultiplexingInbox$TopicListener.class */
    public static class TopicListener {
        private final Predicate<ByteString> filter;
        private final BiConsumer<ByteString, ByteString> messageHandler;

        TopicListener(Predicate<ByteString> predicate, BiConsumer<ByteString, ByteString> biConsumer) {
            this.filter = predicate;
            this.messageHandler = biConsumer;
        }
    }

    DataMultiplexingInbox(ReactiveSocket.Inbox inbox) {
        Objects.requireNonNull(inbox);
        this.queueConnector = new QueueConnector<>(inbox::take);
    }

    public static DataMultiplexingInbox create(ReactiveSocket.Inbox inbox) {
        return new DataMultiplexingInbox(inbox);
    }

    public void start() {
        this.queueConnector.start(this::accept);
    }

    @Override // de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.inputmanagement.data.DataMultiplexer
    public Handle add(Predicate<ByteString> predicate, BiConsumer<ByteString, ByteString> biConsumer) {
        return this.listeners.add(new TopicListener(predicate, biConsumer));
    }

    void accept(List<byte[]> list) {
        if (!DataProtocol.isValid(list)) {
            logger.warn("Received invalid data message");
            return;
        }
        ByteString copyFrom = ByteString.copyFrom(DataProtocol.getTopic(list));
        ByteString copyFrom2 = ByteString.copyFrom(DataProtocol.getData(list));
        this.listeners.forEach(topicListener -> {
            if (topicListener.filter.test(copyFrom)) {
                topicListener.messageHandler.accept(copyFrom, copyFrom2);
            }
        });
    }

    @Override // de.unistuttgart.isw.sfsc.commonjava.util.NotThrowingAutoCloseable, java.lang.AutoCloseable
    public void close() {
        this.queueConnector.close();
    }
}
