package org.apache.nifi.storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.TupleUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.Validate;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/storm/NiFiBolt.class */
public class NiFiBolt extends BaseRichBolt {
    private static final long serialVersionUID = 3067274587595578836L;
    public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
    private final SiteToSiteClientConfig clientConfig;
    private final NiFiDataPacketBuilder builder;
    private final int tickFrequencySeconds;
    private SiteToSiteClient client;
    private OutputCollector collector;
    private BlockingQueue<Tuple> queue = new LinkedBlockingQueue();
    private int batchSize = 10;
    private int batchIntervalInSec = 10;
    private long lastBatchProcessTimeSeconds = 0;

    public NiFiBolt(SiteToSiteClientConfig siteToSiteClientConfig, NiFiDataPacketBuilder niFiDataPacketBuilder, int i) {
        Validate.notNull(siteToSiteClientConfig);
        Validate.notNull(niFiDataPacketBuilder);
        Validate.isTrue(i > 0);
        this.clientConfig = siteToSiteClientConfig;
        this.builder = niFiDataPacketBuilder;
        this.tickFrequencySeconds = i;
    }

    public NiFiBolt withBatchSize(int i) {
        Validate.isTrue(i > 0);
        this.batchSize = i;
        return this;
    }

    public NiFiBolt withBatchInterval(int i) {
        Validate.isTrue(i > 0);
        this.batchIntervalInSec = i;
        return this;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.client = createSiteToSiteClient();
        this.collector = outputCollector;
        this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
        LOGGER.info("Bolt is prepared with Batch Size " + this.batchSize + ", Batch Interval " + this.batchIntervalInSec + ", Tick Frequency is " + this.tickFrequencySeconds);
    }

    protected SiteToSiteClient createSiteToSiteClient() {
        return new SiteToSiteClient.Builder().fromConfig(this.clientConfig).build();
    }

    public void execute(Tuple tuple) {
        if (TupleUtils.isTick(tuple)) {
            if ((System.currentTimeMillis() / 1000) - this.lastBatchProcessTimeSeconds < this.batchIntervalInSec) {
                LOGGER.debug("Received tick tuple, but haven't reached batch interval, nothing to do");
                return;
            } else {
                LOGGER.debug("Received tick tuple and reached batch interval, executing batch");
                finishBatch();
                return;
            }
        }
        this.queue.add(tuple);
        int size = this.queue.size();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Current queue size is " + size + ", and batch size is " + this.batchSize);
        }
        if (size >= this.batchSize) {
            LOGGER.debug("Queue Size is greater than or equal to batch size, executing batch");
            finishBatch();
        }
    }

    private void finishBatch() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Finishing batch of size " + this.queue.size());
        }
        this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
        ArrayList arrayList = new ArrayList();
        this.queue.drainTo(arrayList);
        if (arrayList.size() == 0) {
            LOGGER.debug("Finishing batch, but no tuples so returning...");
            return;
        }
        try {
            Transaction createTransaction = this.client.createTransaction(TransferDirection.SEND);
            if (createTransaction == null) {
                throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                NiFiDataPacket createNiFiDataPacket = this.builder.createNiFiDataPacket((Tuple) it.next());
                createTransaction.send(createNiFiDataPacket.getContent(), createNiFiDataPacket.getAttributes());
            }
            createTransaction.confirm();
            createTransaction.complete();
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                this.collector.ack((Tuple) it2.next());
            }
        } catch (Exception e) {
            LOGGER.warn("Unable to process tuples due to: " + e.getMessage(), e);
            Iterator it3 = arrayList.iterator();
            while (it3.hasNext()) {
                this.collector.fail((Tuple) it3.next());
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    public void cleanup() {
        super.cleanup();
        if (this.client != null) {
            try {
                this.client.close();
            } catch (IOException e) {
                LOGGER.error("Failed to close client", e);
            }
        }
    }

    public Map<String, Object> getComponentConfiguration() {
        HashMap hashMap = new HashMap();
        hashMap.put("topology.tick.tuple.freq.secs", Integer.valueOf(this.tickFrequencySeconds));
        return hashMap;
    }
}
