package org.apache.flink.streaming.connectors.nifi;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/nifi/NiFiSource.class */
public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class);
    private static final long DEFAULT_WAIT_TIME_MS = 1000;
    private final SiteToSiteClientConfig clientConfig;
    private final long waitTimeMs;
    private transient SiteToSiteClient client;
    private volatile boolean isRunning;

    public NiFiSource(SiteToSiteClientConfig siteToSiteClientConfig) {
        this(siteToSiteClientConfig, DEFAULT_WAIT_TIME_MS);
    }

    public NiFiSource(SiteToSiteClientConfig siteToSiteClientConfig, long j) {
        this.isRunning = true;
        this.clientConfig = siteToSiteClientConfig;
        this.waitTimeMs = j;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.client = new SiteToSiteClient.Builder().fromConfig(this.clientConfig).build();
    }

    public void run(SourceFunction.SourceContext<NiFiDataPacket> sourceContext) throws Exception {
        while (this.isRunning) {
            Transaction createTransaction = this.client.createTransaction(TransferDirection.RECEIVE);
            if (createTransaction == null) {
                LOG.warn("A transaction could not be created, waiting and will try again...");
                try {
                    Thread.sleep(this.waitTimeMs);
                } catch (InterruptedException e) {
                }
            } else {
                DataPacket receive = createTransaction.receive();
                if (receive == null) {
                    createTransaction.confirm();
                    createTransaction.complete();
                    LOG.debug("No data available to pull, waiting and will try again...");
                    try {
                        Thread.sleep(this.waitTimeMs);
                    } catch (InterruptedException e2) {
                    }
                } else {
                    ArrayList arrayList = new ArrayList();
                    do {
                        InputStream data = receive.getData();
                        byte[] bArr = new byte[(int) receive.getSize()];
                        StreamUtils.fillBuffer(data, bArr);
                        arrayList.add(new StandardNiFiDataPacket(bArr, receive.getAttributes()));
                        receive = createTransaction.receive();
                    } while (receive != null);
                    createTransaction.confirm();
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        sourceContext.collect((NiFiDataPacket) it.next());
                    }
                    createTransaction.complete();
                }
            }
        }
    }

    public void cancel() {
        this.isRunning = false;
    }

    public void close() throws Exception {
        super.close();
        this.client.close();
    }
}
