/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kudu;

import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Upsert;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
import org.apache.nifi.processors.kudu.OperationType;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;

public abstract class AbstractKudu
extends AbstractProcessor {
    protected static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder().name("Kudu Masters").description("List all kudu masters's ip with port (e.g. 7051), comma separated").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).build();
    protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("Table Name").description("The name of the Kudu Table to put data into").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The service for reading records from incoming flow files.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    protected static final PropertyDescriptor SKIP_HEAD_LINE = new PropertyDescriptor.Builder().name("Skip head line").description("Set it to true if your first line is the header line e.g. column names").allowableValues(new String[]{"true", "false"}).defaultValue("true").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder().name("Insert Operation").description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows").allowableValues((Enum[])OperationType.values()).defaultValue(OperationType.INSERT.toString()).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    protected static final PropertyDescriptor FLUSH_MODE = new PropertyDescriptor.Builder().name("Flush Mode").description("Set the new flush mode for a kudu session.\nAUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\nAUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory operations but it may have to wait when the buffer is full and there's another buffer being flushed.\nMANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.").allowableValues((Enum[])SessionConfiguration.FlushMode.values()).defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString()).required(true).build();
    protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("Set the number of operations that can be buffered, between 2 - 100000. Depending on your memory size, and data size per row set an appropriate batch size. Gradually increase this number to find out the best one for best performances.").defaultValue("100").required(true).addValidator(StandardValidators.createLongValidator((long)2L, (long)100000L, (boolean)true)).expressionLanguageSupported(true).build();
    protected static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu").build();
    protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile is routed to this relationship if it cannot be sent to Kudu").build();
    public static final String RECORD_COUNT_ATTR = "record.count";
    protected String kuduMasters;
    protected String tableName;
    protected boolean skipHeadLine;
    protected OperationType operationType;
    protected SessionConfiguration.FlushMode flushMode;
    protected int batchSize = 100;
    protected KuduClient kuduClient;
    protected KuduTable kuduTable;

    @OnScheduled
    public void OnScheduled(ProcessContext context) {
        try {
            this.tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
            this.kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
            if (this.kuduClient == null) {
                this.getLogger().debug("Setting up Kudu connection...");
                this.kuduClient = this.getKuduConnection(this.kuduMasters);
                this.kuduTable = this.getKuduTable(this.kuduClient, this.tableName);
                this.getLogger().debug("Kudu connection successfully initialized");
            }
            this.operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
            this.batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
            this.flushMode = SessionConfiguration.FlushMode.valueOf((String)context.getProperty(FLUSH_MODE).getValue());
            this.skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean();
        }
        catch (KuduException ex) {
            this.getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), (Throwable)ex);
        }
    }

    @OnStopped
    public final void closeClient() throws KuduException {
        if (this.kuduClient != null) {
            this.getLogger().info("Closing KuduClient");
            this.kuduClient.close();
            this.kuduClient = null;
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        try {
            if (flowFile == null) {
                return;
            }
            HashMap attributes = new HashMap();
            AtomicReference<Object> exceptionHolder = new AtomicReference<Object>(null);
            RecordReaderFactory recordReaderFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
            KuduSession kuduSession = this.getKuduSession(this.kuduClient);
            session.read(flowFile, rawIn -> {
                Record record;
                int numOfAddedRecord;
                RecordSet recordSet;
                List fieldNames;
                RecordReader recordReader = null;
                try {
                    BufferedInputStream in = new BufferedInputStream(rawIn);
                    Throwable throwable = null;
                    try {
                        try {
                            recordReader = recordReaderFactory.createRecordReader(flowFile, (InputStream)in, this.getLogger());
                        }
                        catch (Exception ex) {
                            RecordReaderFactoryException rrfe = new RecordReaderFactoryException("Unable to create RecordReader", (Throwable)ex);
                            exceptionHolder.set(rrfe);
                            if (in != null) {
                                if (throwable != null) {
                                    try {
                                        in.close();
                                    }
                                    catch (Throwable throwable2) {
                                        throwable.addSuppressed(throwable2);
                                    }
                                } else {
                                    in.close();
                                }
                            }
                            IOUtils.closeQuietly((Closeable)recordReader);
                            return;
                        }
                        fieldNames = recordReader.getSchema().getFieldNames();
                        recordSet = recordReader.createRecordSet();
                        if (this.skipHeadLine) {
                            recordSet.next();
                        }
                        numOfAddedRecord = 0;
                        record = recordSet.next();
                    }
                    catch (Throwable throwable3) {
                        throwable = throwable3;
                        throw throwable3;
                    }
                    catch (Throwable throwable4) {
                        throw throwable4;
                    }
                }
                catch (KuduException ex) {
                    this.getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), (Throwable)ex);
                    exceptionHolder.set(ex);
                    {
                        catch (Throwable throwable) {
                            throw throwable;
                        }
                    }
                    IOUtils.closeQuietly((Closeable)recordReader);
                    return;
                    catch (Exception e) {
                        exceptionHolder.set(e);
                        return;
                    }
                }
                while (record != null) {
                    Object oper = null;
                    oper = this.operationType == OperationType.UPSERT ? this.upsertRecordToKudu(this.kuduTable, record, fieldNames) : this.insertRecordToKudu(this.kuduTable, record, fieldNames);
                    kuduSession.apply((Operation)oper);
                    ++numOfAddedRecord;
                    record = recordSet.next();
                }
                this.getLogger().info("KUDU: number of inserted records: " + numOfAddedRecord);
                attributes.put(RECORD_COUNT_ATTR, String.valueOf(numOfAddedRecord));
                IOUtils.closeQuietly((Closeable)recordReader);
            });
            kuduSession.close();
            if (exceptionHolder.get() != null) {
                throw (Throwable)exceptionHolder.get();
            }
            session.putAllAttributes(flowFile, attributes);
            session.transfer(flowFile, REL_SUCCESS);
            session.getProvenanceReporter().send(flowFile, "Successfully added flowfile to kudu");
        }
        catch (IOException | FlowFileAccessException e) {
            this.getLogger().error("Failed to write due to {}", new Object[]{e});
            session.transfer(flowFile, REL_FAILURE);
        }
        catch (Throwable t) {
            this.getLogger().error("Failed to write due to {}", new Object[]{t});
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    protected KuduClient getKuduConnection(String masters) {
        return new KuduClient.KuduClientBuilder(this.kuduMasters).build();
    }

    protected KuduTable getKuduTable(KuduClient client, String tableName) throws KuduException {
        return client.openTable(tableName);
    }

    protected KuduSession getKuduSession(KuduClient client) {
        KuduSession kuduSession = client.newSession();
        kuduSession.setMutationBufferSpace(this.batchSize);
        kuduSession.setFlushMode(this.flushMode);
        if (this.operationType == OperationType.INSERT_IGNORE) {
            kuduSession.setIgnoreAllDuplicateRows(true);
        }
        return kuduSession;
    }

    protected abstract Insert insertRecordToKudu(KuduTable var1, Record var2, List<String> var3) throws Exception;

    protected abstract Upsert upsertRecordToKudu(KuduTable var1, Record var2, List<String> var3) throws Exception;
}

