package org.apache.pulsar.io.nsq;

import com.sproutsocial.nsq.Client;
import com.sproutsocial.nsq.Subscriber;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name = "nsq", type = IOType.SOURCE, help = "A Simple connector moving messages from an NSQ topic to a Pulsar Topic", configClass = NSQSourceConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/nsq/NSQSource.class */
public class NSQSource extends PushSource<byte[]> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) NSQSource.class);
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) NSQSource.class);
    private Subscriber subscriber;
    private Object waitObject;

    /* loaded from: input_file:org/apache/pulsar/io/nsq/NSQSource$NSQRecord.class */
    private static class NSQRecord implements Record<byte[]> {
        private final byte[] value;

        public NSQRecord(byte[] bArr) {
            this.value = bArr;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.api.Record
        public byte[] getValue() {
            return this.value;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof NSQRecord)) {
                return false;
            }
            NSQRecord nSQRecord = (NSQRecord) obj;
            return nSQRecord.canEqual(this) && Arrays.equals(getValue(), nSQRecord.getValue());
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof NSQRecord;
        }

        public int hashCode() {
            return (1 * 59) + Arrays.hashCode(getValue());
        }

        public String toString() {
            return "NSQSource.NSQRecord(value=" + Arrays.toString(getValue()) + ")";
        }
    }

    @Override // org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws IOException {
        NSQSourceConfig nSQSourceConfig = (NSQSourceConfig) IOConfigUtils.loadWithSecrets(map, NSQSourceConfig.class, sourceContext);
        nSQSourceConfig.validate();
        this.waitObject = new Object();
        startThread(nSQSourceConfig);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        stopThread();
    }

    private void startThread(NSQSourceConfig nSQSourceConfig) {
        String[] strArr = new String[nSQSourceConfig.getLookupds().size()];
        nSQSourceConfig.getLookupds().toArray(strArr);
        this.subscriber = new Subscriber(strArr);
        Thread thread = new Thread(() -> {
            this.subscriber.subscribe(nSQSourceConfig.getTopic(), nSQSourceConfig.getChannel(), bArr -> {
                consume(new NSQRecord(bArr));
            });
            LOG.info("NSQ Consumer started for topic {} with channel {}", nSQSourceConfig.getTopic(), nSQSourceConfig.getChannel());
            try {
                synchronized (this.waitObject) {
                    this.waitObject.wait();
                }
            } catch (Exception e) {
                LOG.info("Got an exception in waitObject");
            }
            LOG.debug("Closing the NSQ connection");
            this.subscriber.stop();
            Client.getDefaultClient().stop();
            LOG.info("NSQ subscriber stopped");
            LOG.info("NSQ Runner Thread ending");
        });
        thread.setName("NSQSubscriberRunner");
        thread.start();
    }

    private void stopThread() {
        LOG.info("Source closed");
        synchronized (this.waitObject) {
            this.waitObject.notify();
        }
    }
}
