package de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.inputmanagement.forwarder;

import de.unistuttgart.isw.sfsc.commonjava.util.Handle;
import de.unistuttgart.isw.sfsc.commonjava.util.Listeners;
import de.unistuttgart.isw.sfsc.commonjava.util.NotThrowingAutoCloseable;
import de.unistuttgart.isw.sfsc.commonjava.util.QueueConnector;
import de.unistuttgart.isw.sfsc.commonjava.zmq.reactor.ReactiveSocket;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;

/* loaded from: input_file:de/unistuttgart/isw/sfsc/commonjava/zmq/pubsubsocketpair/inputmanagement/forwarder/ForwardingInbox.class */
public class ForwardingInbox implements Forwarder, NotThrowingAutoCloseable {
    private final Listeners<Consumer<List<byte[]>>> listeners = new Listeners<>();
    private final QueueConnector<List<byte[]>> queueConnector;

    ForwardingInbox(ReactiveSocket.Inbox inbox) {
        Objects.requireNonNull(inbox);
        this.queueConnector = new QueueConnector<>(inbox::take);
    }

    public static ForwardingInbox create(ReactiveSocket.Inbox inbox) {
        return new ForwardingInbox(inbox);
    }

    public void start() {
        this.queueConnector.start(this::accept);
    }

    @Override // de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.inputmanagement.forwarder.Forwarder
    public Handle addListener(Consumer<List<byte[]>> consumer) {
        return this.listeners.add(consumer);
    }

    @Override // de.unistuttgart.isw.sfsc.commonjava.util.NotThrowingAutoCloseable, java.lang.AutoCloseable
    public void close() {
        this.queueConnector.close();
    }

    void accept(List<byte[]> list) {
        this.listeners.forEach(consumer -> {
            consumer.accept(list);
        });
    }
}
