/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.hive;

import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.hive.HiveDBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hive.AbstractHiveQLProcessor;
import org.apache.nifi.processors.hive.SelectHiveQL;
import org.apache.nifi.stream.io.StreamUtils;

@SeeAlso(value={SelectHiveQL.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"sql", "hive", "put", "database", "update", "insert"})
@CapabilityDescription(value="Executes a HiveQL DDL/DML command (UPDATE, INSERT, e.g.). The content of an incoming FlowFile is expected to be the HiveQL command to execute. The HiveQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes with the naming convention hiveql.args.N.type and hiveql.args.N.value, where N is a positive integer. The hiveql.args.N.type is expected to be a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
@ReadsAttributes(value={@ReadsAttribute(attribute="hiveql.args.N.type", description="Incoming FlowFiles are expected to be parameterized HiveQL statements. The type of each Parameter is specified as an integer that represents the JDBC Type of the parameter."), @ReadsAttribute(attribute="hiveql.args.N.value", description="Incoming FlowFiles are expected to be parameterized HiveQL statements. The value of the Parameters are specified as hiveql.args.1.value, hiveql.args.2.value, hiveql.args.3.value, and so on. The type of the hiveql.args.1.value Parameter is specified by the hiveql.args.1.type attribute.")})
public class PutHiveQL
extends AbstractHiveQLProcessor {
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("hive-batch-size").displayName("Batch Size").description("The preferred number of FlowFiles to put to the database in a single transaction").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("100").build();
    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder().name("hive-charset").displayName("Character Set").description("Specifies the character set of the record data.").required(true).defaultValue("UTF-8").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile is routed to this relationship after the database is successfully updated").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, such as an invalid query or an integrity constraint violation").build();
    private static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("hiveql\\.args\\.(\\d+)\\.type");
    private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
    private static final List<PropertyDescriptor> propertyDescriptors;
    private static final Set<Relationship> relationships;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        int batchSize = context.getProperty(BATCH_SIZE).asInteger();
        List flowFiles = session.get(batchSize);
        if (flowFiles.isEmpty()) {
            return;
        }
        long startNanos = System.nanoTime();
        Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
        HiveDBCPService dbcpService = (HiveDBCPService)context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
        try (Connection conn = dbcpService.getConnection();){
            for (FlowFile flowFile : flowFiles) {
                try {
                    String hiveQL = this.getHiveQL(session, flowFile, charset);
                    PreparedStatement stmt = conn.prepareStatement(hiveQL);
                    this.setParameters(stmt, flowFile.getAttributes());
                    stmt.execute();
                    long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                    session.getProvenanceReporter().send(flowFile, dbcpService.getConnectionURL(), transmissionMillis, true);
                    session.transfer(flowFile, REL_SUCCESS);
                }
                catch (SQLException e) {
                    if (e instanceof SQLNonTransientException) {
                        this.getLogger().error("Failed to update Hive for {} due to {}; routing to failure", new Object[]{flowFile, e});
                        session.transfer(flowFile, REL_FAILURE);
                        continue;
                    }
                    this.getLogger().error("Failed to update Hive for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[]{flowFile, e});
                    flowFile = session.penalize(flowFile);
                    session.transfer(flowFile, REL_RETRY);
                }
            }
        }
        catch (SQLException sqle) {
            this.getLogger().error("Failed to get Hive connection due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[]{sqle});
            session.transfer((Collection)flowFiles, REL_RETRY);
            context.yield();
        }
    }

    private String getHiveQL(ProcessSession session, FlowFile flowFile, Charset charset) {
        final byte[] buffer = new byte[(int)flowFile.getSize()];
        session.read(flowFile, new InputStreamCallback(){

            public void process(InputStream in) throws IOException {
                StreamUtils.fillBuffer((InputStream)in, (byte[])buffer);
            }
        });
        return new String(buffer, charset);
    }

    private void setParameters(PreparedStatement stmt, Map<String, String> attributes) throws SQLException {
        for (Map.Entry<String, String> entry : attributes.entrySet()) {
            String key = entry.getKey();
            Matcher matcher = HIVEQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
            if (!matcher.matches()) continue;
            int parameterIndex = Integer.parseInt(matcher.group(1));
            boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
            if (!isNumeric) {
                throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type");
            }
            int jdbcType = Integer.parseInt(entry.getValue());
            String valueAttrName = "hiveql.args." + parameterIndex + ".value";
            String parameterValue = attributes.get(valueAttrName);
            try {
                this.setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType);
            }
            catch (NumberFormatException nfe) {
                throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", (Throwable)nfe);
            }
        }
    }

    private void setParameter(PreparedStatement stmt, String attrName, int parameterIndex, String parameterValue, int jdbcType) throws SQLException {
        if (parameterValue == null) {
            stmt.setNull(parameterIndex, jdbcType);
        } else {
            try {
                switch (jdbcType) {
                    case -7: 
                    case 16: {
                        stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue));
                        break;
                    }
                    case -6: {
                        stmt.setByte(parameterIndex, Byte.parseByte(parameterValue));
                        break;
                    }
                    case 5: {
                        stmt.setShort(parameterIndex, Short.parseShort(parameterValue));
                        break;
                    }
                    case 4: {
                        stmt.setInt(parameterIndex, Integer.parseInt(parameterValue));
                        break;
                    }
                    case -5: {
                        stmt.setLong(parameterIndex, Long.parseLong(parameterValue));
                        break;
                    }
                    case 7: {
                        stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue));
                        break;
                    }
                    case 6: 
                    case 8: {
                        stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue));
                        break;
                    }
                    case 2: 
                    case 3: {
                        stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue));
                        break;
                    }
                    case 91: {
                        stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue)));
                        break;
                    }
                    case 92: {
                        stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue)));
                        break;
                    }
                    case 93: {
                        stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue)));
                        break;
                    }
                    case -16: 
                    case -1: 
                    case 1: 
                    case 12: {
                        stmt.setString(parameterIndex, parameterValue);
                        break;
                    }
                    default: {
                        stmt.setObject(parameterIndex, (Object)parameterValue, jdbcType);
                        break;
                    }
                }
            }
            catch (SQLException e) {
                this.getLogger().error("Error setting parameter {} to value from {} ({})", new Object[]{parameterIndex, attrName, parameterValue}, (Throwable)e);
                throw e;
            }
        }
    }

    static {
        ArrayList<PropertyDescriptor> _propertyDescriptors = new ArrayList<PropertyDescriptor>();
        _propertyDescriptors.add(HIVE_DBCP_SERVICE);
        _propertyDescriptors.add(BATCH_SIZE);
        _propertyDescriptors.add(CHARSET);
        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        _relationships.add(REL_FAILURE);
        _relationships.add(REL_RETRY);
        relationships = Collections.unmodifiableSet(_relationships);
    }
}

