/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kudu;

import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Upsert;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.kudu.OperationType;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;

public abstract class AbstractKudu
extends AbstractProcessor {
    protected static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder().name("Kudu Masters").description("List all kudu masters's ip with port (e.g. 7051), comma separated").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("Table Name").description("The name of the Kudu Table to put data into").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The service for reading records from incoming flow files.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    protected static final PropertyDescriptor SKIP_HEAD_LINE = new PropertyDescriptor.Builder().name("Skip head line").description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader (e.g. \"Treat First Line as Header\" property of CSVReader)").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder().name("Insert Operation").description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows").allowableValues((Enum[])OperationType.values()).defaultValue(OperationType.INSERT.toString()).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    protected static final PropertyDescriptor FLUSH_MODE = new PropertyDescriptor.Builder().name("Flush Mode").description("Set the new flush mode for a kudu session.\nAUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\nAUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory operations but it may have to wait when the buffer is full and there's another buffer being flushed.\nMANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.").allowableValues((Enum[])SessionConfiguration.FlushMode.values()).defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString()).required(true).build();
    protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. Depending on your memory size, and data size per row set an appropriate batch size. Gradually increase this number to find out the best one for best performances.").defaultValue("100").required(true).addValidator(StandardValidators.createLongValidator((long)1L, (long)100000L, (boolean)true)).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    protected static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu").build();
    protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile is routed to this relationship if it cannot be sent to Kudu").build();
    public static final String RECORD_COUNT_ATTR = "record.count";
    protected String kuduMasters;
    protected String tableName;
    protected boolean skipHeadLine;
    protected OperationType operationType;
    protected SessionConfiguration.FlushMode flushMode;
    protected int batchSize = 100;
    protected KuduClient kuduClient;
    protected KuduTable kuduTable;

    @OnScheduled
    public void OnScheduled(ProcessContext context) {
        try {
            this.tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
            this.kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
            this.operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
            this.batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
            this.flushMode = SessionConfiguration.FlushMode.valueOf((String)context.getProperty(FLUSH_MODE).getValue());
            this.skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean();
            if (this.kuduClient == null) {
                this.getLogger().debug("Setting up Kudu connection...");
                this.kuduClient = this.getKuduConnection(this.kuduMasters);
                this.kuduTable = this.getKuduTable(this.kuduClient, this.tableName);
                this.getLogger().debug("Kudu connection successfully initialized");
            }
        }
        catch (KuduException ex) {
            this.getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), (Throwable)ex);
        }
    }

    @OnStopped
    public final void closeClient() throws KuduException {
        if (this.kuduClient != null) {
            this.getLogger().info("Closing KuduClient");
            this.kuduClient.close();
            this.kuduClient = null;
        }
    }

    private Stream<RowError> flushKuduSession(KuduSession kuduSession, boolean close) throws Exception {
        List responses = close ? kuduSession.close() : kuduSession.flush();
        Stream<RowError> rowErrors = kuduSession.getFlushMode() == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND ? Stream.of(kuduSession.getPendingErrors().getRowErrors()) : responses.stream().filter(OperationResponse::hasRowError).map(OperationResponse::getRowError);
        return rowErrors;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        List flowFiles = session.get(this.batchSize);
        if (flowFiles.isEmpty()) {
            return;
        }
        KuduSession kuduSession = this.getKuduSession(this.kuduClient);
        RecordReaderFactory recordReaderFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        HashMap<FlowFile, Integer> numRecords = new HashMap<FlowFile, Integer>();
        HashMap<FlowFile, Object> flowFileFailures = new HashMap<FlowFile, Object>();
        HashMap<Upsert, FlowFile> operationFlowFileMap = new HashMap<Upsert, FlowFile>();
        int numBuffered = 0;
        Stream<Object> pendingRowErrors = Stream.empty();
        block22: for (FlowFile flowFile : flowFiles) {
            try {
                InputStream in = session.read(flowFile);
                Throwable throwable = null;
                try {
                    RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, this.getLogger());
                    Throwable throwable2 = null;
                    try {
                        List fieldNames = recordReader.getSchema().getFieldNames();
                        RecordSet recordSet = recordReader.createRecordSet();
                        if (this.skipHeadLine) {
                            recordSet.next();
                        }
                        Record record = recordSet.next();
                        while (record != null) {
                            OperationResponse response;
                            Upsert operation = this.operationType == OperationType.UPSERT ? this.upsertRecordToKudu(this.kuduTable, record, fieldNames) : this.insertRecordToKudu(this.kuduTable, record, fieldNames);
                            operationFlowFileMap.put(operation, flowFile);
                            if (numBuffered == this.batchSize && this.flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
                                numBuffered = 0;
                                pendingRowErrors = Stream.concat(pendingRowErrors, this.flushKuduSession(kuduSession, false));
                            }
                            if ((response = kuduSession.apply((Operation)operation)) != null && response.hasRowError()) {
                                flowFileFailures.put(flowFile, response.getRowError());
                                continue block22;
                            }
                            ++numBuffered;
                            numRecords.merge(flowFile, 1, Integer::sum);
                            record = recordSet.next();
                        }
                    }
                    catch (Throwable throwable3) {
                        throwable2 = throwable3;
                        throw throwable3;
                    }
                    finally {
                        if (recordReader == null) continue;
                        if (throwable2 != null) {
                            try {
                                recordReader.close();
                            }
                            catch (Throwable throwable4) {
                                throwable2.addSuppressed(throwable4);
                            }
                            continue;
                        }
                        recordReader.close();
                    }
                }
                catch (Throwable throwable5) {
                    throwable = throwable5;
                    throw throwable5;
                }
                finally {
                    if (in == null) continue;
                    if (throwable != null) {
                        try {
                            in.close();
                        }
                        catch (Throwable throwable6) {
                            throwable.addSuppressed(throwable6);
                        }
                        continue;
                    }
                    in.close();
                }
            }
            catch (Exception ex) {
                flowFileFailures.put(flowFile, ex);
            }
        }
        try {
            Map<FlowFile, List<RowError>> flowFileRowErrors = Stream.concat(pendingRowErrors, numBuffered > 0 ? this.flushKuduSession(kuduSession, true) : Stream.empty()).collect(Collectors.groupingBy(e -> (FlowFile)operationFlowFileMap.get(e.getOperation())));
            flowFiles.forEach(ff -> {
                int count = numRecords.getOrDefault(ff, 0);
                List rowErrors = (List)flowFileRowErrors.get(ff);
                if (rowErrors != null) {
                    rowErrors.forEach(rowError -> this.getLogger().error("Failed to write due to {}", new Object[]{rowError}));
                    session.putAttribute(ff, RECORD_COUNT_ATTR, String.valueOf(count - rowErrors.size()));
                    session.transfer(ff, REL_FAILURE);
                } else {
                    session.putAttribute(ff, RECORD_COUNT_ATTR, String.valueOf(count));
                    if (flowFileFailures.containsKey(ff)) {
                        this.getLogger().error("Failed to write due to {}", new Object[]{flowFileFailures.get(ff)});
                        session.transfer(ff, REL_FAILURE);
                    } else {
                        session.transfer(ff, REL_SUCCESS);
                        session.getProvenanceReporter().send(ff, "Successfully added FlowFile to Kudu");
                    }
                }
            });
        }
        catch (Exception ex) {
            throw new ProcessException((Throwable)ex);
        }
    }

    protected KuduClient getKuduConnection(String masters) {
        return new KuduClient.KuduClientBuilder(this.kuduMasters).build();
    }

    protected KuduTable getKuduTable(KuduClient client, String tableName) throws KuduException {
        return client.openTable(tableName);
    }

    protected KuduSession getKuduSession(KuduClient client) {
        KuduSession kuduSession = client.newSession();
        kuduSession.setMutationBufferSpace(this.batchSize);
        kuduSession.setFlushMode(this.flushMode);
        if (this.operationType == OperationType.INSERT_IGNORE) {
            kuduSession.setIgnoreAllDuplicateRows(true);
        }
        return kuduSession;
    }

    protected abstract Insert insertRecordToKudu(KuduTable var1, Record var2, List<String> var3) throws Exception;

    protected abstract Upsert upsertRecordToKudu(KuduTable var1, Record var2, List<String> var3) throws Exception;
}

