package org.apache.streampipes.messaging.nats;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Subscription;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.grounding.NatsTransportProtocol;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.model.nats.NatsConfig;

/* loaded from: input_file:org/apache/streampipes/messaging/nats/NatsConsumer.class */
public class NatsConsumer extends AbstractNatsConnector implements EventConsumer<NatsTransportProtocol> {
    private Dispatcher dispatcher;
    private Subscription subscription;

    public void connect(NatsConfig natsConfig, InternalEventProcessor<byte[]> internalEventProcessor) throws IOException, InterruptedException {
        makeBrokerConnection(natsConfig);
        createSubscription(internalEventProcessor);
    }

    public void connect(NatsTransportProtocol natsTransportProtocol, InternalEventProcessor<byte[]> internalEventProcessor) throws SpRuntimeException {
        try {
            makeBrokerConnection(natsTransportProtocol);
            createSubscription(internalEventProcessor);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override // org.apache.streampipes.messaging.nats.AbstractNatsConnector
    public void disconnect() throws SpRuntimeException {
        try {
            this.dispatcher.unsubscribe(this.subscription);
            super.disconnect();
        } catch (InterruptedException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    public boolean isConnected() {
        return this.natsConnection != null && this.natsConnection.getStatus() == Connection.Status.CONNECTED;
    }

    private void createSubscription(InternalEventProcessor<byte[]> internalEventProcessor) {
        this.dispatcher = this.natsConnection.createDispatcher(message -> {
        });
        this.subscription = this.dispatcher.subscribe(this.subject, message2 -> {
            internalEventProcessor.onEvent(message2.getData());
        });
    }

    public /* bridge */ /* synthetic */ void connect(TransportProtocol transportProtocol, InternalEventProcessor internalEventProcessor) throws SpRuntimeException {
        connect((NatsTransportProtocol) transportProtocol, (InternalEventProcessor<byte[]>) internalEventProcessor);
    }
}
