/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.hive;

import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.hive.HiveDBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.ErrorTypes;
import org.apache.nifi.processor.util.pattern.ExceptionHandler;
import org.apache.nifi.processor.util.pattern.PartialFunctions;
import org.apache.nifi.processor.util.pattern.Put;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processor.util.pattern.RoutingResult;
import org.apache.nifi.processors.hive.AbstractHiveQLProcessor;
import org.apache.nifi.processors.hive.SelectHiveQL;

@SeeAlso(value={SelectHiveQL.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"sql", "hive", "put", "database", "update", "insert"})
@CapabilityDescription(value="Executes a HiveQL DDL/DML command (UPDATE, INSERT, e.g.). The content of an incoming FlowFile is expected to be the HiveQL command to execute. The HiveQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes with the naming convention hiveql.args.N.type and hiveql.args.N.value, where N is a positive integer. The hiveql.args.N.type is expected to be a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
@ReadsAttributes(value={@ReadsAttribute(attribute="hiveql.args.N.type", description="Incoming FlowFiles are expected to be parametrized HiveQL statements. The type of each Parameter is specified as an integer that represents the JDBC Type of the parameter."), @ReadsAttribute(attribute="hiveql.args.N.value", description="Incoming FlowFiles are expected to be parametrized HiveQL statements. The value of the Parameters are specified as hiveql.args.1.value, hiveql.args.2.value, hiveql.args.3.value, and so on. The type of the hiveql.args.1.value Parameter is specified by the hiveql.args.1.type attribute.")})
@WritesAttributes(value={@WritesAttribute(attribute="query.input.tables", description="This attribute is written on the flow files routed to the 'success' relationships, and contains input table names (if any) in comma delimited 'databaseName.tableName' format."), @WritesAttribute(attribute="query.output.tables", description="This attribute is written on the flow files routed to the 'success' relationships, and contains the target table names in 'databaseName.tableName' format.")})
public class PutHiveQL
extends AbstractHiveQLProcessor {
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("hive-batch-size").displayName("Batch Size").description("The preferred number of FlowFiles to put to the database in a single transaction").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("100").build();
    public static final PropertyDescriptor STATEMENT_DELIMITER = new PropertyDescriptor.Builder().name("statement-delimiter").displayName("Statement Delimiter").description("Statement Delimiter used to separate SQL statements in a multiple statement script").required(true).defaultValue(";").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile is routed to this relationship after the database is successfully updated").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, such as an invalid query or an integrity constraint violation").build();
    private static final List<PropertyDescriptor> propertyDescriptors;
    private static final Set<Relationship> relationships;
    private Put<FunctionContext, Connection> process;
    private ExceptionHandler<FunctionContext> exceptionHandler;
    private PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (context, session, fc, ff) -> {
        HiveDBCPService dbcpService = (HiveDBCPService)context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
        Connection connection = dbcpService.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
        fc.connectionUrl = dbcpService.getConnectionURL();
        return connection;
    };
    private PartialFunctions.FetchFlowFiles<FunctionContext> fetchFlowFiles = (context, session, functionContext, result) -> {
        int batchSize = context.getProperty(BATCH_SIZE).asInteger();
        return session.get(batchSize);
    };
    private Put.PutFlowFile<FunctionContext, Connection> putFlowFile = (context, session, fc, conn, flowFile, result) -> {
        String script = this.getHiveQL(session, flowFile, fc.charset);
        String regex = "(?<!\\\\)" + Pattern.quote(fc.statementDelimiter);
        String[] hiveQLs = script.split(regex);
        HashSet tableNames = new HashSet();
        this.exceptionHandler.execute((Object)fc, (Object)flowFile, input -> {
            int loc = 1;
            for (String hiveQLStr : hiveQLs) {
                this.getLogger().debug("HiveQL: {}", new Object[]{hiveQLStr});
                String hiveQL = hiveQLStr.trim();
                if (StringUtils.isEmpty((CharSequence)hiveQL)) continue;
                PreparedStatement stmt = conn.prepareStatement(hiveQL);
                int paramCount = StringUtils.countMatches((CharSequence)hiveQL, (CharSequence)"?");
                if (paramCount > 0) {
                    loc = this.setParameters(loc, stmt, paramCount, flowFile.getAttributes());
                }
                try {
                    tableNames.addAll(this.findTableNames(hiveQL));
                }
                catch (Exception e) {
                    this.getLogger().warn("Failed to parse hiveQL: {} due to {}", new Object[]{hiveQL, e}, (Throwable)e);
                }
                stmt.execute();
                fc.proceed();
            }
            long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fc.startNanos);
            FlowFile updatedFlowFile = session.putAllAttributes(flowFile, this.toQueryTableAttributes(tableNames));
            session.getProvenanceReporter().send(updatedFlowFile, fc.connectionUrl, transmissionMillis, true);
            result.routeTo(flowFile, REL_SUCCESS);
        }, this.onFlowFileError(context, session, result));
    };

    @OnScheduled
    public void constructProcess() {
        this.exceptionHandler = new ExceptionHandler();
        this.exceptionHandler.mapException(e -> {
            if (e instanceof SQLNonTransientException) {
                return ErrorTypes.InvalidInput;
            }
            if (e instanceof SQLException) {
                int errorCode = ((SQLException)e).getErrorCode();
                if (errorCode >= 10000 && errorCode < 20000) {
                    return ErrorTypes.InvalidInput;
                }
                if (errorCode >= 20000 && errorCode < 30000) {
                    return ErrorTypes.TemporalFailure;
                }
                if (errorCode >= 30000 && errorCode < 40000) {
                    return ErrorTypes.TemporalInputFailure;
                }
                if (errorCode >= 40000 && errorCode < 50000) {
                    return ErrorTypes.InvalidInput;
                }
                return ErrorTypes.UnknownFailure;
            }
            return ErrorTypes.UnknownFailure;
        });
        this.exceptionHandler.adjustError(RollbackOnFailure.createAdjustError((ComponentLog)this.getLogger()));
        this.process = new Put();
        this.process.setLogger(this.getLogger());
        this.process.initConnection(this.initConnection);
        this.process.fetchFlowFiles(this.fetchFlowFiles);
        this.process.putFlowFile(this.putFlowFile);
        this.process.adjustRoute(RollbackOnFailure.createAdjustRoute((Relationship[])new Relationship[]{REL_FAILURE, REL_RETRY}));
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    private ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError(ProcessContext context, ProcessSession session, RoutingResult result) {
        ExceptionHandler.OnError onFlowFileError = ExceptionHandler.createOnError((ProcessContext)context, (ProcessSession)session, (RoutingResult)result, (Relationship)REL_FAILURE, (Relationship)REL_RETRY);
        onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> {
            switch (r.destination()) {
                case Failure: {
                    this.getLogger().error("Failed to update Hive for {} due to {}; routing to failure", new Object[]{i, e}, (Throwable)e);
                    break;
                }
                case Retry: {
                    this.getLogger().error("Failed to update Hive for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[]{i, e}, (Throwable)e);
                }
            }
        });
        return RollbackOnFailure.createOnError((ExceptionHandler.OnError)onFlowFileError);
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        Boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
        Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
        String statementDelimiter = context.getProperty(STATEMENT_DELIMITER).getValue();
        FunctionContext functionContext = new FunctionContext(rollbackOnFailure, charset, statementDelimiter);
        RollbackOnFailure.onTrigger((ProcessContext)context, (ProcessSessionFactory)sessionFactory, (RollbackOnFailure)functionContext, (ComponentLog)this.getLogger(), session -> this.process.onTrigger(context, session, (Object)functionContext));
    }

    static {
        ArrayList<PropertyDescriptor> _propertyDescriptors = new ArrayList<PropertyDescriptor>();
        _propertyDescriptors.add(HIVE_DBCP_SERVICE);
        _propertyDescriptors.add(BATCH_SIZE);
        _propertyDescriptors.add(CHARSET);
        _propertyDescriptors.add(STATEMENT_DELIMITER);
        _propertyDescriptors.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        _relationships.add(REL_FAILURE);
        _relationships.add(REL_RETRY);
        relationships = Collections.unmodifiableSet(_relationships);
    }

    private class FunctionContext
    extends RollbackOnFailure {
        final Charset charset;
        final String statementDelimiter;
        final long startNanos;
        String connectionUrl;

        private FunctionContext(boolean rollbackOnFailure, Charset charset, String statementDelimiter) {
            super(rollbackOnFailure, false);
            this.startNanos = System.nanoTime();
            this.charset = charset;
            this.statementDelimiter = statementDelimiter;
        }
    }
}

