/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kudu;

import com.google.common.annotations.VisibleForTesting;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.Upsert;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.kudu.AbstractKudu;
import org.apache.nifi.serialization.record.Record;

@EventDriven
@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"put", "database", "NoSQL", "kudu", "HDFS", "record"})
@CapabilityDescription(value="Reads records from an incoming FlowFile using the provided Record Reader, and writes those records to the specified Kudu's table. The schema for the table must be provided in the processor properties or from your source. If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure")
@WritesAttribute(attribute="record.count", description="Number of records written to Kudu")
public class PutKudu
extends AbstractKudu {
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(KUDU_MASTERS);
        properties.add(TABLE_NAME);
        properties.add(SKIP_HEAD_LINE);
        properties.add(RECORD_READER);
        properties.add(INSERT_OPERATION);
        properties.add(FLUSH_MODE);
        properties.add(BATCH_SIZE);
        return properties;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(REL_SUCCESS);
        rels.add(REL_FAILURE);
        return rels;
    }

    @Override
    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception {
        Upsert upsert = kuduTable.newUpsert();
        this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames);
        return upsert;
    }

    @Override
    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception {
        Insert insert = kuduTable.newInsert();
        this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames);
        return insert;
    }

    @VisibleForTesting
    void buildPartialRow(Schema schema, PartialRow row, Record record, List<String> fieldNames) {
        block12: for (String colName : fieldNames) {
            int colIdx = this.getColumnIndex(schema, colName);
            if (colIdx == -1) continue;
            ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
            Type colType = colSchema.getType();
            if (record.getValue(colName) == null) {
                row.setNull(colName);
                continue;
            }
            switch (colType.getDataType(colSchema.getTypeAttributes())) {
                case BOOL: {
                    row.addBoolean(colIdx, record.getAsBoolean(colName).booleanValue());
                    continue block12;
                }
                case FLOAT: {
                    row.addFloat(colIdx, record.getAsFloat(colName).floatValue());
                    continue block12;
                }
                case DOUBLE: {
                    row.addDouble(colIdx, record.getAsDouble(colName).doubleValue());
                    continue block12;
                }
                case BINARY: {
                    row.addBinary(colIdx, record.getAsString(colName).getBytes());
                    continue block12;
                }
                case INT8: {
                    row.addByte(colIdx, record.getAsInt(colName).byteValue());
                    continue block12;
                }
                case INT16: {
                    row.addShort(colIdx, record.getAsInt(colName).shortValue());
                    continue block12;
                }
                case INT32: {
                    row.addInt(colIdx, record.getAsInt(colName).intValue());
                    continue block12;
                }
                case INT64: 
                case UNIXTIME_MICROS: {
                    row.addLong(colIdx, record.getAsLong(colName).longValue());
                    continue block12;
                }
                case STRING: {
                    row.addString(colIdx, record.getAsString(colName));
                    continue block12;
                }
                case DECIMAL32: 
                case DECIMAL64: 
                case DECIMAL128: {
                    row.addDecimal(colIdx, new BigDecimal(record.getAsString(colName)));
                    continue block12;
                }
            }
            throw new IllegalStateException(String.format("unknown column type %s", colType));
        }
    }

    private int getColumnIndex(Schema columns, String colName) {
        try {
            return columns.getColumnIndex(colName);
        }
        catch (Exception ex) {
            return -1;
        }
    }
}

