/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kudu;

import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.security.auth.login.LoginException;
import org.apache.kudu.Schema;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.kudu.AbstractKuduProcessor;
import org.apache.nifi.processors.kudu.AutoFlushSyncPutKuduResult;
import org.apache.nifi.processors.kudu.OperationType;
import org.apache.nifi.processors.kudu.PutKuduResult;
import org.apache.nifi.processors.kudu.StandardPutKuduResult;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSet;

@SystemResourceConsideration(resource=SystemResource.MEMORY)
@EventDriven
@SupportsBatching
@RequiresInstanceClassLoading
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"put", "database", "NoSQL", "kudu", "HDFS", "record"})
@CapabilityDescription(value="Reads records from an incoming FlowFile using the provided Record Reader, and writes those records to the specified Kudu's table. The schema for the Kudu table is inferred from the schema of the Record Reader. If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure")
@WritesAttribute(attribute="record.count", description="Number of records written to Kudu")
public class PutKudu
extends AbstractKuduProcessor {
    static final AllowableValue FAILURE_STRATEGY_ROUTE = new AllowableValue("route-to-failure", "Route to Failure", "The FlowFile containing the Records that failed to insert will be routed to the 'failure' relationship");
    static final AllowableValue FAILURE_STRATEGY_ROLLBACK = new AllowableValue("rollback", "Rollback Session", "If any Record cannot be inserted, all FlowFiles in the session will be rolled back to their input queue. This means that if data cannot be pushed, it will block any subsequent data from be pushed to Kudu as well until the issue is resolved. However, this may be advantageous if a strict ordering is required.");
    protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("Table Name").description("The name of the Kudu Table to put data into").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The service for reading records from incoming flow files.").identifiesControllerService(RecordReaderFactory.class).required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor FAILURE_STRATEGY = new PropertyDescriptor.Builder().name("Failure Strategy").displayName("Failure Strategy").description("If one or more Records in a batch cannot be transferred to Kudu, specifies how to handle the failure").required(true).allowableValues(new AllowableValue[]{FAILURE_STRATEGY_ROUTE, FAILURE_STRATEGY_ROLLBACK}).defaultValue(FAILURE_STRATEGY_ROUTE.getValue()).build();
    protected static final PropertyDescriptor SKIP_HEAD_LINE = new PropertyDescriptor.Builder().name("Skip head line").description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader (e.g. \"Treat First Line as Header\" property of CSVReader)").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    protected static final PropertyDescriptor LOWERCASE_FIELD_NAMES = new PropertyDescriptor.Builder().name("Lowercase Field Names").description("Convert column names to lowercase when finding index of Kudu table columns").defaultValue("false").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    protected static final PropertyDescriptor HANDLE_SCHEMA_DRIFT = new PropertyDescriptor.Builder().name("Handle Schema Drift").description("If set to true, when fields with names that are not in the target Kudu table are encountered, the Kudu table will be altered to include new columns for those fields.").defaultValue("false").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    static final PropertyDescriptor DATA_RECORD_PATH = new PropertyDescriptor.Builder().name("Data RecordPath").displayName("Data RecordPath").description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record and the Record that results from evaluating the RecordPath will be sent to Kudu instead of sending the entire incoming Record. If not specified, the entire incoming Record will be published to Kudu.").required(false).addValidator((Validator)new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final PropertyDescriptor OPERATION_RECORD_PATH = new PropertyDescriptor.Builder().name("Operation RecordPath").displayName("Operation RecordPath").description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record in order to determine the Kudu Operation Type. When evaluated, the RecordPath must evaluate to one of the valid Kudu Operation Types (Debezium style operation types are also supported: \"r\" and \"c\" for INSERT, \"u\" for UPDATE, and \"d\" for DELETE), or the incoming FlowFile will be routed to failure. If this property is specified, the <Kudu Operation Type> property will be ignored.").required(false).addValidator((Validator)new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    protected static final Validator OperationTypeValidator = new Validator(){

        public ValidationResult validate(String subject, String value, ValidationContext context) {
            boolean valid;
            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
                return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
            }
            try {
                OperationType.valueOf(value.toUpperCase());
                valid = true;
            }
            catch (IllegalArgumentException ex) {
                valid = false;
            }
            String explanation = valid ? null : "Value must be one of: " + Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", "));
            return new ValidationResult.Builder().subject(subject).input(value).valid(valid).explanation(explanation).build();
        }
    };
    protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder().name("Insert Operation").displayName("Kudu Operation Type").description("Specify operationType for this processor.\nValid values are: " + Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", ")) + ". This Property will be ignored if the <Operation RecordPath> property is set.").defaultValue(OperationType.INSERT.toString()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(OperationTypeValidator).build();
    protected static final PropertyDescriptor FLUSH_MODE = new PropertyDescriptor.Builder().name("Flush Mode").description("Set the new flush mode for a kudu session.\nAUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\nAUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory operations but it may have to wait when the buffer is full and there's another buffer being flushed.\nMANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.").allowableValues((Enum[])SessionConfiguration.FlushMode.values()).defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString()).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new PropertyDescriptor.Builder().name("FlowFiles per Batch").description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. Depending on your memory size, and data size per row set an appropriate batch size for the number of FlowFiles to process per client connection setup.Gradually increase this number, only if your FlowFiles typically contain a few records.").defaultValue("1").required(true).addValidator(StandardValidators.createLongValidator((long)1L, (long)100000L, (boolean)true)).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").displayName("Max Records per Batch").description("The maximum number of Records to process in a single Kudu-client batch, between 1 - 100000. Depending on your memory size, and data size per row set an appropriate batch size. Gradually increase this number to find out the best one for best performances.").defaultValue("100").required(true).addValidator(StandardValidators.createLongValidator((long)1L, (long)100000L, (boolean)true)).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    protected static final PropertyDescriptor IGNORE_NULL = new PropertyDescriptor.Builder().name("Ignore NULL").description("Ignore NULL on Kudu Put Operation, Update only non-Null columns if set true").defaultValue("false").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    protected static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu").build();
    protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile is routed to this relationship if it cannot be sent to Kudu").build();
    public static final String RECORD_COUNT_ATTR = "record.count";
    private volatile int batchSize = 100;
    private volatile int ffbatch = 1;
    private volatile SessionConfiguration.FlushMode flushMode;
    private volatile Function<Record, OperationType> recordPathOperationType;
    private volatile RecordPath dataRecordPath;
    private volatile String failureStrategy;
    private volatile boolean supportsInsertIgnoreOp;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(KUDU_MASTERS);
        properties.add(TABLE_NAME);
        properties.add(FAILURE_STRATEGY);
        properties.add(KERBEROS_USER_SERVICE);
        properties.add(KERBEROS_CREDENTIALS_SERVICE);
        properties.add(KERBEROS_PRINCIPAL);
        properties.add(KERBEROS_PASSWORD);
        properties.add(SKIP_HEAD_LINE);
        properties.add(LOWERCASE_FIELD_NAMES);
        properties.add(HANDLE_SCHEMA_DRIFT);
        properties.add(RECORD_READER);
        properties.add(DATA_RECORD_PATH);
        properties.add(OPERATION_RECORD_PATH);
        properties.add(INSERT_OPERATION);
        properties.add(FLUSH_MODE);
        properties.add(FLOWFILE_BATCH_SIZE);
        properties.add(BATCH_SIZE);
        properties.add(IGNORE_NULL);
        properties.add(KUDU_OPERATION_TIMEOUT_MS);
        properties.add(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS);
        properties.add(WORKER_COUNT);
        properties.add(KUDU_SASL_PROTOCOL_NAME);
        return properties;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(REL_SUCCESS);
        rels.add(REL_FAILURE);
        return rels;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws LoginException {
        this.batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
        this.ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
        this.flushMode = SessionConfiguration.FlushMode.valueOf((String)context.getProperty(FLUSH_MODE).getValue().toUpperCase());
        this.createKerberosUserAndOrKuduClient(context);
        this.supportsInsertIgnoreOp = this.supportsIgnoreOperations();
        String operationRecordPathValue = context.getProperty(OPERATION_RECORD_PATH).getValue();
        if (operationRecordPathValue == null) {
            this.recordPathOperationType = null;
        } else {
            RecordPath recordPath = RecordPath.compile((String)operationRecordPathValue);
            this.recordPathOperationType = new RecordPathOperationType(recordPath);
        }
        String dataRecordPathValue = context.getProperty(DATA_RECORD_PATH).getValue();
        this.dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile((String)dataRecordPathValue);
        this.failureStrategy = context.getProperty(FAILURE_STRATEGY).getValue();
    }

    private boolean isRollbackOnFailure() {
        return FAILURE_STRATEGY_ROLLBACK.getValue().equalsIgnoreCase(this.failureStrategy);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        List flowFiles = session.get(this.ffbatch);
        if (flowFiles.isEmpty()) {
            return;
        }
        KerberosUser user = this.getKerberosUser();
        if (user == null) {
            this.executeOnKuduClient(kuduClient -> this.processFlowFiles(context, session, flowFiles, (KuduClient)kuduClient));
            return;
        }
        PrivilegedExceptionAction<Void> privilegedAction = () -> {
            this.executeOnKuduClient(kuduClient -> this.processFlowFiles(context, session, flowFiles, (KuduClient)kuduClient));
            return null;
        };
        KerberosAction action = new KerberosAction(user, privilegedAction, this.getLogger());
        action.execute();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processFlowFiles(ProcessContext context, ProcessSession session, List<FlowFile> flowFiles, KuduClient kuduClient) {
        KuduSession kuduSession = this.createKuduSession(kuduClient);
        PutKuduResult putKuduResult = this.flushMode == SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC ? new AutoFlushSyncPutKuduResult() : new StandardPutKuduResult();
        try {
            this.processRecords(flowFiles, session, context, kuduClient, kuduSession, putKuduResult);
        }
        finally {
            try {
                List<RowError> rowErrors = this.closeKuduSession(kuduSession);
                if (this.flushMode == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
                    putKuduResult.addErrors(this.getPendingRowErrorsFromKuduSession(kuduSession));
                } else {
                    putKuduResult.addErrors(rowErrors);
                }
            }
            catch (RuntimeException | KuduException e) {
                this.getLogger().error("KuduSession.close() Failed", e);
            }
        }
        putKuduResult.resolveFlowFileToRowErrorAssociations();
        if (this.isRollbackOnFailure() && putKuduResult.hasRowErrorsOrFailures()) {
            this.logFailures(putKuduResult);
            session.rollback();
            context.yield();
        } else {
            this.transferFlowFiles(flowFiles, session, putKuduResult);
        }
    }

    private void processRecords(List<FlowFile> flowFiles, ProcessSession session, ProcessContext context, KuduClient kuduClient, KuduSession kuduSession, PutKuduResult putKuduResult) {
        RecordReaderFactory recordReaderFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        int bufferedRecords = 0;
        OperationType prevOperationType = OperationType.INSERT;
        block20: for (FlowFile flowFile : flowFiles) {
            putKuduResult.setFlowFile(flowFile);
            try {
                InputStream in = session.read(flowFile);
                Throwable throwable = null;
                try {
                    RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, this.getLogger());
                    Throwable throwable2 = null;
                    try {
                        boolean driftDetected;
                        Function<Record, OperationType> operationTypeFunction;
                        String tableName = this.getEvaluatedProperty(TABLE_NAME, context, flowFile);
                        boolean ignoreNull = Boolean.parseBoolean(this.getEvaluatedProperty(IGNORE_NULL, context, flowFile));
                        boolean lowercaseFields = Boolean.parseBoolean(this.getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
                        boolean handleSchemaDrift = Boolean.parseBoolean(this.getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
                        if (this.recordPathOperationType == null) {
                            OperationType staticOperationType = OperationType.valueOf(this.getEvaluatedProperty(INSERT_OPERATION, context, flowFile).toUpperCase());
                            operationTypeFunction = record -> staticOperationType;
                        } else {
                            operationTypeFunction = this.recordPathOperationType;
                        }
                        RecordSet recordSet = recordReader.createRecordSet();
                        KuduTable kuduTable = kuduClient.openTable(tableName);
                        Record record2 = recordSet.next();
                        if (handleSchemaDrift && (driftDetected = this.handleSchemaDrift(kuduTable, kuduClient, flowFile, record2, lowercaseFields))) {
                            kuduTable = kuduClient.openTable(tableName);
                        }
                        while (record2 != null) {
                            List<Record> dataRecords;
                            OperationType operationType = operationTypeFunction.apply(record2);
                            if (this.dataRecordPath == null) {
                                dataRecords = Collections.singletonList(record2);
                            } else {
                                RecordPathResult result = this.dataRecordPath.evaluate(record2);
                                List fieldValues = result.getSelectedFields().collect(Collectors.toList());
                                if (fieldValues.isEmpty()) {
                                    throw new ProcessException("RecordPath " + this.dataRecordPath.getPath() + " evaluated against Record yielded no results.");
                                }
                                for (FieldValue fieldValue : fieldValues) {
                                    RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType();
                                    if (fieldType == RecordFieldType.RECORD) continue;
                                    throw new ProcessException("RecordPath " + this.dataRecordPath.getPath() + " evaluated against Record expected to return one or more Records but encountered field of type " + fieldType);
                                }
                                dataRecords = new ArrayList<Record>(fieldValues.size());
                                for (FieldValue fieldValue : fieldValues) {
                                    dataRecords.add((Record)fieldValue.getValue());
                                }
                            }
                            for (Record dataRecord : dataRecords) {
                                OperationResponse response;
                                if (!(this.supportsInsertIgnoreOp || prevOperationType == operationType || prevOperationType != OperationType.INSERT_IGNORE && operationType != OperationType.INSERT_IGNORE)) {
                                    List<RowError> rowErrors = this.flushKuduSession(kuduSession);
                                    if (this.flushMode == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
                                        putKuduResult.addErrors(this.getPendingRowErrorsFromKuduSession(kuduSession));
                                    } else {
                                        putKuduResult.addErrors(rowErrors);
                                    }
                                    kuduSession.setIgnoreAllDuplicateRows(operationType == OperationType.INSERT_IGNORE);
                                }
                                prevOperationType = operationType;
                                List fieldNames = dataRecord.getSchema().getFieldNames();
                                Operation operation = this.createKuduOperation(operationType, dataRecord, fieldNames, ignoreNull, lowercaseFields, kuduTable);
                                putKuduResult.recordOperation(operation);
                                if (bufferedRecords == this.batchSize && this.flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
                                    bufferedRecords = 0;
                                    List<RowError> rowErrors = this.flushKuduSession(kuduSession);
                                    putKuduResult.addErrors(rowErrors);
                                }
                                if ((response = kuduSession.apply(operation)) != null && response.hasRowError()) {
                                    putKuduResult.addFailure(response.getRowError());
                                    continue block20;
                                }
                                ++bufferedRecords;
                                putKuduResult.incrementProcessedRecordsForFlowFile();
                            }
                            record2 = recordSet.next();
                        }
                    }
                    catch (Throwable throwable3) {
                        throwable2 = throwable3;
                        throw throwable3;
                    }
                    finally {
                        if (recordReader == null) continue;
                        if (throwable2 != null) {
                            try {
                                recordReader.close();
                            }
                            catch (Throwable throwable4) {
                                throwable2.addSuppressed(throwable4);
                            }
                            continue;
                        }
                        recordReader.close();
                    }
                }
                catch (Throwable throwable5) {
                    throwable = throwable5;
                    throw throwable5;
                }
                finally {
                    if (in == null) continue;
                    if (throwable != null) {
                        try {
                            in.close();
                        }
                        catch (Throwable throwable6) {
                            throwable.addSuppressed(throwable6);
                        }
                        continue;
                    }
                    in.close();
                }
            }
            catch (Exception ex) {
                this.getLogger().error("Failed to push {} to Kudu", new Object[]{flowFile, ex});
                putKuduResult.addFailure(ex);
            }
        }
    }

    private boolean handleSchemaDrift(KuduTable kuduTable, KuduClient kuduClient, FlowFile flowFile, Record record, boolean lowercaseFields) {
        ArrayList<RecordField> recordFields;
        if (record == null) {
            this.getLogger().debug("No Record to evaluate schema drift against for {}", new Object[]{flowFile});
            return false;
        }
        String tableName = kuduTable.getName();
        Schema schema = kuduTable.getSchema();
        if (this.dataRecordPath == null) {
            recordFields = record.getSchema().getFields();
        } else {
            RecordPathResult recordPathResult = this.dataRecordPath.evaluate(record);
            List fieldValues = recordPathResult.getSelectedFields().collect(Collectors.toList());
            recordFields = new ArrayList<RecordField>();
            for (FieldValue fieldValue : fieldValues) {
                RecordField recordField = fieldValue.getField();
                if (recordField.getDataType().getFieldType() == RecordFieldType.RECORD) {
                    Object value = fieldValue.getValue();
                    if (!(value instanceof Record)) continue;
                    recordFields.addAll(((Record)value).getSchema().getFields());
                    continue;
                }
                recordFields.add(recordField);
            }
        }
        List missing = recordFields.stream().filter(field -> !schema.hasColumn(lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName())).collect(Collectors.toList());
        if (missing.isEmpty()) {
            this.getLogger().debug("No schema drift detected for {}", new Object[]{flowFile});
            return false;
        }
        this.getLogger().info("Adding {} columns to table '{}' to handle schema drift", new Object[]{missing.size(), tableName});
        for (RecordField field2 : missing) {
            try {
                String columnName = lowercaseFields ? field2.getFieldName().toLowerCase() : field2.getFieldName();
                kuduClient.alterTable(tableName, this.getAddNullableColumnStatement(columnName, field2.getDataType()));
            }
            catch (KuduException e) {
                if (e.getStatus().isAlreadyPresent()) {
                    this.getLogger().info("Column already exists in table '{}' while handling schema drift", new Object[]{tableName});
                    continue;
                }
                throw new ProcessException((Throwable)e);
            }
        }
        return true;
    }

    private void transferFlowFiles(List<FlowFile> flowFiles, ProcessSession session, PutKuduResult putKuduResult) {
        long totalCount = 0L;
        for (FlowFile flowFile : flowFiles) {
            int count = putKuduResult.getProcessedRecordsForFlowFile(flowFile);
            totalCount += (long)count;
            List<RowError> rowErrors = putKuduResult.getRowErrorsForFlowFile(flowFile);
            if (rowErrors != null && !rowErrors.isEmpty()) {
                rowErrors.forEach(rowError -> this.getLogger().error("Failed to write due to {}", new Object[]{rowError.toString()}));
                flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR, Integer.toString(count - rowErrors.size()));
                totalCount -= (long)rowErrors.size();
                session.transfer(flowFile, REL_FAILURE);
                continue;
            }
            if (putKuduResult.isFlowFileProcessedSuccessfully(flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count)))) {
                session.transfer(flowFile, REL_SUCCESS);
                session.getProvenanceReporter().send(flowFile, "Successfully added FlowFile to Kudu");
                continue;
            }
            this.getLogger().error("Failed to write due to {}", new Object[]{putKuduResult.getFailureForFlowFile(flowFile)});
            session.transfer(flowFile, REL_FAILURE);
        }
        session.adjustCounter("Records Inserted", totalCount, false);
    }

    private void logFailures(PutKuduResult putKuduResult) {
        Set<FlowFile> processedFlowFiles = putKuduResult.getProcessedFlowFiles();
        for (FlowFile flowFile : processedFlowFiles) {
            List<RowError> errors = putKuduResult.getRowErrorsForFlowFile(flowFile);
            if (errors.isEmpty()) continue;
            this.getLogger().error("Could not write {} to Kudu due to: {}", new Object[]{flowFile, errors});
        }
    }

    private String getEvaluatedProperty(PropertyDescriptor property, ProcessContext context, FlowFile flowFile) {
        PropertyValue evaluatedProperty = context.getProperty(property).evaluateAttributeExpressions(flowFile);
        if (property.isRequired() && evaluatedProperty == null) {
            throw new ProcessException(String.format("Property `%s` is required but evaluated to null", property.getDisplayName()));
        }
        return evaluatedProperty.getValue();
    }

    protected KuduSession createKuduSession(KuduClient client) {
        KuduSession kuduSession = client.newSession();
        kuduSession.setMutationBufferSpace(this.batchSize);
        kuduSession.setFlushMode(this.flushMode);
        return kuduSession;
    }

    protected Operation createKuduOperation(OperationType operationType, Record record, List<String> fieldNames, boolean ignoreNull, boolean lowercaseFields, KuduTable kuduTable) {
        Insert operation;
        switch (operationType) {
            case INSERT: {
                operation = kuduTable.newInsert();
                break;
            }
            case INSERT_IGNORE: {
                if (!this.supportsInsertIgnoreOp) {
                    operation = kuduTable.newInsert();
                    break;
                }
                operation = kuduTable.newInsertIgnore();
                break;
            }
            case UPSERT: {
                operation = kuduTable.newUpsert();
                break;
            }
            case UPDATE: {
                operation = kuduTable.newUpdate();
                break;
            }
            case UPDATE_IGNORE: {
                operation = kuduTable.newUpdateIgnore();
                break;
            }
            case DELETE: {
                operation = kuduTable.newDelete();
                break;
            }
            case DELETE_IGNORE: {
                operation = kuduTable.newDeleteIgnore();
                break;
            }
            default: {
                throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", new Object[]{operationType}));
            }
        }
        this.buildPartialRow(kuduTable.getSchema(), operation.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
        return operation;
    }

    private static class RecordPathOperationType
    implements Function<Record, OperationType> {
        private final RecordPath recordPath;

        public RecordPathOperationType(RecordPath recordPath) {
            this.recordPath = recordPath;
        }

        @Override
        public OperationType apply(Record record) {
            RecordPathResult recordPathResult = this.recordPath.evaluate(record);
            List resultList = recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
            if (resultList.isEmpty()) {
                throw new ProcessException("Evaluated RecordPath " + this.recordPath.getPath() + " against Record but got no results");
            }
            if (resultList.size() > 1) {
                throw new ProcessException("Evaluated RecordPath " + this.recordPath.getPath() + " against Record and received multiple distinct results (" + resultList + ")");
            }
            String resultValue = String.valueOf(((FieldValue)resultList.get(0)).getValue());
            try {
                switch (resultValue) {
                    case "c": 
                    case "r": {
                        return OperationType.INSERT;
                    }
                    case "u": {
                        return OperationType.UPDATE;
                    }
                    case "d": {
                        return OperationType.DELETE;
                    }
                }
                return OperationType.valueOf(resultValue.toUpperCase());
            }
            catch (IllegalArgumentException iae) {
                throw new ProcessException("Evaluated RecordPath " + this.recordPath.getPath() + " against Record to determine Kudu Operation Type but found invalid value: " + resultValue);
            }
        }
    }
}

