/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kudu;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.Upsert;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.kudu.AbstractKudu;
import org.apache.nifi.serialization.record.Record;

@EventDriven
@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"put", "database", "NoSQL", "kudu", "HDFS", "record"})
@CapabilityDescription(value="Reads records from an incoming FlowFile using the provided Record Reader, and writes those records to the specified Kudu's table. The schema for the table must be provided in the processor properties or from your source. If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure")
@WritesAttribute(attribute="record.count", description="Number of records written to Kudu")
public class PutKudu
extends AbstractKudu {
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(KUDU_MASTERS);
        properties.add(TABLE_NAME);
        properties.add(SKIP_HEAD_LINE);
        properties.add(RECORD_READER);
        properties.add(INSERT_OPERATION);
        properties.add(FLUSH_MODE);
        properties.add(BATCH_SIZE);
        return properties;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(REL_SUCCESS);
        rels.add(REL_FAILURE);
        return rels;
    }

    @Override
    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception {
        Upsert upsert = kuduTable.newUpsert();
        this.insert(kuduTable, (Operation)upsert, record, fieldNames);
        return upsert;
    }

    @Override
    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception {
        Insert insert = kuduTable.newInsert();
        this.insert(kuduTable, (Operation)insert, record, fieldNames);
        return insert;
    }

    private void insert(KuduTable kuduTable, Operation operation, Record record, List<String> fieldNames) {
        PartialRow row = operation.getRow();
        Schema colSchema = kuduTable.getSchema();
        block10: for (String colName : fieldNames) {
            int colIdx = this.getColumnIndex(colSchema, colName);
            if (colIdx == -1) continue;
            Type colType = colSchema.getColumnByIndex(colIdx).getType();
            switch (colType.getDataType()) {
                case BOOL: {
                    row.addBoolean(colIdx, record.getAsBoolean(colName).booleanValue());
                    continue block10;
                }
                case FLOAT: {
                    row.addFloat(colIdx, record.getAsFloat(colName).floatValue());
                    continue block10;
                }
                case DOUBLE: {
                    row.addDouble(colIdx, record.getAsDouble(colName).doubleValue());
                    continue block10;
                }
                case BINARY: {
                    row.addBinary(colIdx, record.getAsString(colName).getBytes());
                    continue block10;
                }
                case INT8: 
                case INT16: {
                    short temp = (short)record.getAsInt(colName).intValue();
                    row.addShort(colIdx, temp);
                }
                case INT32: {
                    row.addInt(colIdx, record.getAsInt(colName).intValue());
                    continue block10;
                }
                case INT64: {
                    row.addLong(colIdx, record.getAsLong(colName).longValue());
                    continue block10;
                }
                case STRING: {
                    row.addString(colIdx, record.getAsString(colName));
                    continue block10;
                }
            }
            throw new IllegalStateException(String.format("unknown column type %s", colType));
        }
    }

    private int getColumnIndex(Schema columns, String colName) {
        try {
            return columns.getColumnIndex(colName);
        }
        catch (Exception ex) {
            return -1;
        }
    }
}

