package org.apache.nifi.storm;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/storm/NiFiSpout.class */
public class NiFiSpout extends BaseRichSpout {
    private static final long serialVersionUID = 3067274587595578836L;
    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiSpout.class);
    public static final String NIFI_DATA_PACKET = "nifiDataPacket";
    private NiFiSpoutReceiver spoutReceiver;
    private LinkedBlockingQueue<NiFiDataPacket> queue;
    private SpoutOutputCollector spoutOutputCollector;
    private final SiteToSiteClientConfig clientConfig;
    private final List<String> attributeNames;

    /* loaded from: input_file:org/apache/nifi/storm/NiFiSpout$NiFiSpoutReceiver.class */
    class NiFiSpoutReceiver extends Thread {
        private boolean shutdown = false;

        NiFiSpoutReceiver() {
        }

        public synchronized void shutdown() {
            this.shutdown = true;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                SiteToSiteClient build = new SiteToSiteClient.Builder().fromConfig(NiFiSpout.this.clientConfig).build();
                while (!this.shutdown) {
                    try {
                        Transaction createTransaction = build.createTransaction(TransferDirection.RECEIVE);
                        DataPacket receive = createTransaction.receive();
                        if (receive == null) {
                            createTransaction.confirm();
                            createTransaction.complete();
                            try {
                                Thread.sleep(1000L);
                            } catch (InterruptedException e) {
                            }
                        } else {
                            ArrayList arrayList = new ArrayList();
                            do {
                                InputStream data = receive.getData();
                                byte[] bArr = new byte[(int) receive.getSize()];
                                StreamUtils.fillBuffer(data, bArr);
                                arrayList.add(new StandardNiFiDataPacket(bArr, receive.getAttributes()));
                                receive = createTransaction.receive();
                            } while (receive != null);
                            createTransaction.confirm();
                            Iterator it = arrayList.iterator();
                            while (it.hasNext()) {
                                NiFiSpout.this.queue.offer((NiFiDataPacket) it.next());
                            }
                            createTransaction.complete();
                        }
                    } catch (Throwable th) {
                        try {
                            build.close();
                        } catch (IOException e2) {
                            NiFiSpout.LOGGER.error("Failed to close client", e2);
                        }
                        throw th;
                    }
                }
                try {
                    build.close();
                } catch (IOException e3) {
                    NiFiSpout.LOGGER.error("Failed to close client", e3);
                }
            } catch (IOException e4) {
                NiFiSpout.LOGGER.error("Failed to receive data from NiFi", e4);
            }
        }
    }

    public NiFiSpout(SiteToSiteClientConfig siteToSiteClientConfig) {
        this(siteToSiteClientConfig, null);
    }

    public NiFiSpout(SiteToSiteClientConfig siteToSiteClientConfig, List<String> list) {
        this.clientConfig = siteToSiteClientConfig;
        this.attributeNames = list == null ? new ArrayList<>() : list;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
        this.queue = new LinkedBlockingQueue<>(1000);
        this.spoutReceiver = new NiFiSpoutReceiver();
        this.spoutReceiver.setDaemon(true);
        this.spoutReceiver.setName("NiFi Spout Receiver");
        this.spoutReceiver.start();
    }

    public void nextTuple() {
        NiFiDataPacket poll = this.queue.poll();
        if (poll == null) {
            Utils.sleep(50L);
            return;
        }
        Values values = new Values(new Object[]{poll});
        for (String str : this.attributeNames) {
            if (poll.getAttributes().containsKey(str)) {
                values.add(poll.getAttributes().get(str));
            }
        }
        this.spoutOutputCollector.emit(values);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(NIFI_DATA_PACKET);
        arrayList.addAll(this.attributeNames);
        outputFieldsDeclarer.declare(new Fields(arrayList));
    }

    public void close() {
        super.close();
        this.spoutReceiver.shutdown();
    }
}
