package org.apache.nifi.spark;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Map;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;

/* loaded from: input_file:org/apache/nifi/spark/NiFiReceiver.class */
public class NiFiReceiver extends Receiver<NiFiDataPacket> {
    private static final long serialVersionUID = 3067274587595578836L;
    private final SiteToSiteClientConfig clientConfig;

    /* loaded from: input_file:org/apache/nifi/spark/NiFiReceiver$ReceiveRunnable.class */
    class ReceiveRunnable implements Runnable {
        public ReceiveRunnable() {
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            try {
                SiteToSiteClient build = new SiteToSiteClient.Builder().fromConfig(NiFiReceiver.this.clientConfig).build();
                while (!NiFiReceiver.this.isStopped()) {
                    try {
                        Transaction createTransaction = build.createTransaction(TransferDirection.RECEIVE);
                        DataPacket receive = createTransaction.receive();
                        if (receive == null) {
                            createTransaction.confirm();
                            createTransaction.complete();
                            try {
                                Thread.sleep(1000L);
                            } catch (InterruptedException e) {
                            }
                        } else {
                            ArrayList arrayList = new ArrayList();
                            do {
                                InputStream data = receive.getData();
                                final byte[] bArr = new byte[(int) receive.getSize()];
                                StreamUtils.fillBuffer(data, bArr);
                                final Map attributes = receive.getAttributes();
                                arrayList.add(new NiFiDataPacket() { // from class: org.apache.nifi.spark.NiFiReceiver.ReceiveRunnable.1
                                    @Override // org.apache.nifi.spark.NiFiDataPacket
                                    public byte[] getContent() {
                                        return bArr;
                                    }

                                    @Override // org.apache.nifi.spark.NiFiDataPacket
                                    public Map<String, String> getAttributes() {
                                        return attributes;
                                    }
                                });
                                receive = createTransaction.receive();
                            } while (receive != null);
                            createTransaction.confirm();
                            NiFiReceiver.this.store(arrayList.iterator());
                            createTransaction.complete();
                        }
                    } catch (Throwable th) {
                        try {
                            build.close();
                        } catch (IOException e2) {
                            NiFiReceiver.this.reportError("Failed to close client", e2);
                        }
                        throw th;
                    }
                }
                try {
                    build.close();
                } catch (IOException e3) {
                    NiFiReceiver.this.reportError("Failed to close client", e3);
                }
            } catch (IOException e4) {
                NiFiReceiver.this.restart("Failed to receive data from NiFi", e4);
            }
        }
    }

    public NiFiReceiver(SiteToSiteClientConfig siteToSiteClientConfig, StorageLevel storageLevel) {
        super(storageLevel);
        this.clientConfig = siteToSiteClientConfig;
    }

    public void onStart() {
        Thread thread = new Thread(new ReceiveRunnable());
        thread.setDaemon(true);
        thread.setName("NiFi Receiver");
        thread.start();
    }

    public void onStop() {
    }
}
