package org.apache.nifi.processors.hive;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.streaming.ConnectionError;
import org.apache.hive.streaming.HiveRecordWriter;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.hive.streaming.InvalidTable;
import org.apache.hive.streaming.RecordsEOFException;
import org.apache.hive.streaming.SerializationError;
import org.apache.hive.streaming.StreamingConnection;
import org.apache.hive.streaming.StreamingException;
import org.apache.hive.streaming.StreamingIOFailure;
import org.apache.hive.streaming.TransactionError;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.ValidationResources;

@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. If 'Static Partition Values' is not set, then the partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in each record should be field A. If 'Static Partition Values' is set, those values will be used as the partition values, and any record fields corresponding to partition columns will be ignored.")
@RequiresInstanceClassLoading
@Tags({"hive", "streaming", "put", "database", "store"})
@WritesAttributes({@WritesAttribute(attribute = PutHive3Streaming.HIVE_STREAMING_RECORD_COUNT_ATTR, description = "This attribute is written on the flow files routed to the 'success' and 'failure' relationships, and contains the number of records from the incoming flow file. All records in a flow file are committed as a single transaction."), @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.")})
/* loaded from: input_file:org/apache/nifi/processors/hive/PutHive3Streaming.class */
public class PutHive3Streaming extends AbstractProcessor {
    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled";
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The service for reading records from incoming flow files.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder().name("hive3-stream-metastore-uri").displayName("Hive Metastore URI").description("The URI location(s) for the Hive metastore. This is a comma-separated list of Hive metastore URIs; note that this is not the location of the Hive Server. The default port for the Hive metastore is 9043. If this field is not set, then the 'hive.metastore.uris' property from any provided configuration resources will be used, and if none are provided, then the default value from a default hive-site.xml will be used (usually thrift://localhost:9083).").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.URI_LIST_VALIDATOR).build();
    static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder().name("hive3-config-resources").displayName("Hive Configuration Resources").description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. Please see the Hive documentation for more details.").required(false).identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, new ResourceType[0]).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder().name("hive3-stream-database-name").displayName("Database Name").description("The name of the database in which to put the data.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("hive3-stream-table-name").displayName("Table Name").description("The name of the database table in which to put the data.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor STATIC_PARTITION_VALUES = new PropertyDescriptor.Builder().name("hive3-stream-part-vals").displayName("Static Partition Values").description("Specifies a comma-separated list of the values for the partition columns of the target table. If the incoming records all have the same values for the partition columns, those values can be entered here, resulting in a performance gain. If specified, this property will often contain Expression Language, for example if PartitionRecord is upstream and two partitions 'name' and 'age' are used, then this property can be set to ${name},${age}. If this property is set, the values will be used as the partition values, and any record fields corresponding to partition columns will be ignored. If this property is not set, then the partition values are expected to be the last fields of each record.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor RECORDS_PER_TXN = new PropertyDescriptor.Builder().name("hive3-stream-records-per-transaction").displayName("Records per Transaction").description("Number of records to process before committing the transaction. If set to zero (0), all records will be written in a single transaction.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).defaultValue("0").build();
    static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder().name("hive3-stream-transactions-per-batch").displayName("Transactions per Batch").description("A hint to Hive Streaming indicating how many transactions the processor task will need. The product of Records per Transaction (if not zero) and Transactions per Batch should be larger than the largest expected number of records in the flow file(s), this will ensure any failed transaction batches cause a full rollback.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("1").build();
    static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder().name("hive3-stream-call-timeout").displayName("Call Timeout").description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.").defaultValue("0").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    static final PropertyDescriptor DISABLE_STREAMING_OPTIMIZATIONS = new PropertyDescriptor.Builder().name("hive3-stream-disable-optimizations").displayName("Disable Streaming Optimizations").description("Whether to disable streaming optimizations. Disabling streaming optimizations will have significant impact to performance and memory consumption.").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty("NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed, (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later) then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue. Duplicated records can be created for the succeeded ones when the same FlowFile is processed again.");
    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder().name("kerberos-credentials-service").displayName("Kerberos Credentials Service").description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos").identifiesControllerService(KerberosCredentialsService.class).required(false).build();
    static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder().name("kerberos-principal").displayName("Kerberos Principal").description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder().name("kerberos-password").displayName("Kerberos Password").description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This can be used to provide a retry capability since full rollback is not possible.").build();
    private List<PropertyDescriptor> propertyDescriptors;
    private Set<Relationship> relationships;
    protected volatile UserGroupInformation ugi;
    protected volatile HiveConf hiveConfig;
    protected volatile int callTimeout;
    protected ExecutorService callTimeoutPool;
    protected volatile boolean rollbackOnFailure;
    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    protected final AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference<>();
    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();

    /* loaded from: input_file:org/apache/nifi/processors/hive/PutHive3Streaming$ShouldRetryException.class */
    private static class ShouldRetryException extends RuntimeException {
        private ShouldRetryException(String str, Throwable th) {
            super(str, th);
        }
    }

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(RECORD_READER);
        arrayList.add(METASTORE_URI);
        arrayList.add(HIVE_CONFIGURATION_RESOURCES);
        arrayList.add(DB_NAME);
        arrayList.add(TABLE_NAME);
        arrayList.add(STATIC_PARTITION_VALUES);
        arrayList.add(RECORDS_PER_TXN);
        arrayList.add(TXNS_PER_BATCH);
        arrayList.add(CALL_TIMEOUT);
        arrayList.add(DISABLE_STREAMING_OPTIMIZATIONS);
        arrayList.add(ROLLBACK_ON_FAILURE);
        arrayList.add(KERBEROS_CREDENTIALS_SERVICE);
        arrayList.add(KERBEROS_PRINCIPAL);
        arrayList.add(KERBEROS_PASSWORD);
        this.propertyDescriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        hashSet.add(REL_RETRY);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.propertyDescriptors;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        boolean isSet = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
        ArrayList arrayList = new ArrayList();
        KerberosCredentialsService asControllerService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
        String value = validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
        String value2 = validationContext.getProperty(KERBEROS_PASSWORD).getValue();
        String principal = asControllerService != null ? asControllerService.getPrincipal() : value;
        String keytab = asControllerService != null ? asControllerService.getKeytab() : null;
        if (isSet) {
            arrayList.addAll(this.hiveConfigurator.validate(validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(), principal, keytab, value2, this.validationResourceHolder, getLogger()));
        }
        if (asControllerService != null && (value != null || value2 != null)) {
            arrayList.add(new ValidationResult.Builder().subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName()).valid(false).explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time").build());
        }
        return arrayList;
    }

    @OnScheduled
    public void setup(ProcessContext processContext) throws IOException {
        ComponentLog logger = getLogger();
        this.rollbackOnFailure = processContext.getProperty(ROLLBACK_ON_FAILURE).asBoolean().booleanValue();
        this.hiveConfig = this.hiveConfigurator.getConfigurationFromFiles(processContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue());
        if (processContext.getMaxConcurrentTasks() > 1) {
            this.hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true);
        }
        for (Map.Entry entry : processContext.getProperties().entrySet()) {
            PropertyDescriptor propertyDescriptor = (PropertyDescriptor) entry.getKey();
            if (propertyDescriptor.isDynamic()) {
                this.hiveConfig.set(propertyDescriptor.getName(), (String) entry.getValue());
            }
        }
        this.hiveConfigurator.preload(this.hiveConfig);
        if (SecurityUtil.isSecurityEnabled(this.hiveConfig)) {
            KerberosCredentialsService asControllerService = processContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
            String value = processContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
            String value2 = processContext.getProperty(KERBEROS_PASSWORD).getValue();
            String principal = asControllerService != null ? asControllerService.getPrincipal() : value;
            String keytab = asControllerService != null ? asControllerService.getKeytab() : null;
            if (keytab != null) {
                this.kerberosUserReference.set(new KerberosKeytabUser(principal, keytab));
                logger.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keytab});
            } else {
                if (value2 == null) {
                    throw new ProcessException("Unable to authenticate with Kerberos, no keytab or password was provided");
                }
                this.kerberosUserReference.set(new KerberosPasswordUser(principal, value2));
                logger.info("Hive Security Enabled, logging in as principal {} with password", new Object[]{principal});
            }
            try {
                this.ugi = this.hiveConfigurator.authenticate(this.hiveConfig, this.kerberosUserReference.get());
                logger.info("Successfully logged in as principal " + principal);
            } catch (AuthenticationFailedException e) {
                logger.error(e.getMessage(), e);
                throw new ProcessException(e);
            }
        } else {
            this.ugi = SecurityUtil.loginSimple(this.hiveConfig);
            this.kerberosUserReference.set(null);
        }
        this.callTimeout = processContext.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger().intValue() * 1000;
        this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("put-hive3-streaming-%d").build());
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        getUgi().doAs(() -> {
            String str;
            FlowFile flowFile = processSession.get();
            if (flowFile == null) {
                return null;
            }
            RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
            String value = processContext.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
            String value2 = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
            ComponentLog logger = getLogger();
            String str2 = null;
            if (processContext.getProperty(METASTORE_URI).isSet()) {
                str2 = processContext.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
                if (StringUtils.isEmpty(str2)) {
                    logger.error("The '" + METASTORE_URI.getDisplayName() + "' property evaluated to null or empty, penalizing flow file, routing to failure");
                    processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
                }
            }
            String value3 = processContext.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
            boolean booleanValue = processContext.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean().booleanValue();
            if (str2 != null) {
                this.hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), str2);
            }
            int intValue = processContext.getProperty(RECORDS_PER_TXN).evaluateAttributeExpressions(flowFile).asInteger().intValue();
            HiveOptions withTransactionBatchSize = new HiveOptions(str2, value, value2).withHiveConf(this.hiveConfig).withCallTimeout(Integer.valueOf(this.callTimeout)).withStreamingOptimizations(!booleanValue).withTransactionBatchSize(processContext.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions(flowFile).asInteger().intValue());
            if (!StringUtils.isEmpty(value3)) {
                withTransactionBatchSize = withTransactionBatchSize.withStaticPartitionValues((List) Arrays.stream(value3.split(",")).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).map((v0) -> {
                    return v0.trim();
                }).collect(Collectors.toList()));
            }
            if (SecurityUtil.isSecurityEnabled(this.hiveConfig)) {
                KerberosCredentialsService asControllerService2 = processContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
                String value4 = processContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
                if (asControllerService2 != null) {
                    str = asControllerService2.getPrincipal();
                    withTransactionBatchSize = withTransactionBatchSize.withKerberosKeytab(asControllerService2.getKeytab());
                } else {
                    str = value4;
                }
                withTransactionBatchSize = withTransactionBatchSize.withKerberosPrincipal(str);
            }
            HiveOptions hiveOptions = withTransactionBatchSize;
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
            StreamingConnection streamingConnection = null;
            try {
                try {
                    try {
                        try {
                            try {
                                try {
                                    try {
                                        InputStream read = processSession.read(flowFile);
                                        try {
                                            try {
                                                StreamingConnection makeStreamingConnection = makeStreamingConnection(hiveOptions, asControllerService.createRecordReader(flowFile, read, getLogger()), intValue);
                                                boolean z = false;
                                                while (!z) {
                                                    makeStreamingConnection.beginTransaction();
                                                    try {
                                                        makeStreamingConnection.write(read);
                                                    } catch (RecordsEOFException e) {
                                                        z = true;
                                                    }
                                                    makeStreamingConnection.commitTransaction();
                                                }
                                                read.close();
                                                HashMap hashMap = new HashMap();
                                                hashMap.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(makeStreamingConnection.getConnectionStats().getRecordsWritten()));
                                                hashMap.put(AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES, hiveOptions.getQualifiedTableName());
                                                FlowFile putAllAttributes = processSession.putAllAttributes(flowFile, hashMap);
                                                processSession.getProvenanceReporter().send(putAllAttributes, makeStreamingConnection.getMetastoreUri());
                                                if (read != null) {
                                                    read.close();
                                                }
                                                processSession.transfer(putAllAttributes, REL_SUCCESS);
                                                closeConnection(makeStreamingConnection);
                                                Thread.currentThread().setContextClassLoader(contextClassLoader);
                                                return null;
                                            } catch (Throwable th) {
                                                if (read != null) {
                                                    try {
                                                        read.close();
                                                    } catch (Throwable th2) {
                                                        th.addSuppressed(th2);
                                                    }
                                                }
                                                throw th;
                                            }
                                        } catch (Exception e2) {
                                            throw new RecordReaderFactoryException("Unable to create RecordReader", e2);
                                        }
                                    } catch (StreamingException e3) {
                                        Throwable cause = e3.getCause();
                                        if (cause == null) {
                                            cause = e3;
                                        }
                                        if (this.rollbackOnFailure) {
                                            if (0 != 0) {
                                                abortConnection(null);
                                            }
                                            throw new ProcessException(cause.getLocalizedMessage(), cause);
                                        }
                                        FlowFile penalize = processSession.penalize(flowFile);
                                        HashMap hashMap2 = new HashMap();
                                        hashMap2.put(HIVE_STREAMING_RECORD_COUNT_ATTR, 0 != 0 ? Long.toString(streamingConnection.getConnectionStats().getRecordsWritten()) : "0");
                                        hashMap2.put(AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES, hiveOptions.getQualifiedTableName());
                                        FlowFile putAllAttributes2 = processSession.putAllAttributes(penalize, hashMap2);
                                        logger.error("Exception while trying to stream {} to hive - routing to failure", new Object[]{putAllAttributes2, e3});
                                        processSession.transfer(putAllAttributes2, REL_FAILURE);
                                        closeConnection(null);
                                        Thread.currentThread().setContextClassLoader(contextClassLoader);
                                        return null;
                                    }
                                } catch (Throwable th3) {
                                    closeConnection(null);
                                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                                    throw th3;
                                }
                            } catch (ShouldRetryException e4) {
                                getLogger().error(e4.getLocalizedMessage(), e4);
                                if (0 != 0) {
                                    abortConnection(null);
                                }
                                processSession.transfer(processSession.penalize(flowFile), REL_RETRY);
                                closeConnection(null);
                                Thread.currentThread().setContextClassLoader(contextClassLoader);
                                return null;
                            }
                        } catch (DiscontinuedException e5) {
                            getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e5, e5});
                            processSession.transfer(flowFile, Relationship.SELF);
                            closeConnection(null);
                            Thread.currentThread().setContextClassLoader(contextClassLoader);
                            return null;
                        }
                    } catch (ConnectionError e6) {
                        processContext.yield();
                        throw new ProcessException("A connection to metastore cannot be established", e6);
                    } catch (Throwable th4) {
                        if (0 != 0) {
                            abortConnection(null);
                        }
                        if (th4 instanceof ProcessException) {
                            throw th4;
                        }
                        throw new ProcessException(th4);
                    }
                } catch (TransactionError e7) {
                    if (this.rollbackOnFailure) {
                        throw new ProcessException(e7.getLocalizedMessage(), e7);
                    }
                    throw new ShouldRetryException(e7.getLocalizedMessage(), e7);
                } catch (RecordReaderFactoryException e8) {
                    if (this.rollbackOnFailure) {
                        throw new ProcessException(e8);
                    }
                    logger.error("Failed to create {} for {} - routing to failure", new Object[]{RecordReader.class.getSimpleName(), flowFile, e8});
                    processSession.transfer(flowFile, REL_FAILURE);
                    closeConnection(null);
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    return null;
                }
            } catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e9) {
                if (this.rollbackOnFailure) {
                    if (0 != 0) {
                        abortConnection(null);
                    }
                    throw new ProcessException(e9.getLocalizedMessage(), e9);
                }
                HashMap hashMap3 = new HashMap();
                hashMap3.put(HIVE_STREAMING_RECORD_COUNT_ATTR, 0 != 0 ? Long.toString(streamingConnection.getConnectionStats().getRecordsWritten()) : "0");
                hashMap3.put(AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES, hiveOptions.getQualifiedTableName());
                FlowFile putAllAttributes3 = processSession.putAllAttributes(flowFile, hashMap3);
                logger.error("Exception while processing {} - routing to failure", new Object[]{putAllAttributes3, e9});
                processSession.transfer(putAllAttributes3, REL_FAILURE);
                closeConnection(null);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                return null;
            }
        });
    }

    StreamingConnection makeStreamingConnection(HiveOptions hiveOptions, RecordReader recordReader, int i) throws StreamingException {
        HiveStreamingConnection.Builder withTransactionBatchSize = HiveStreamingConnection.newBuilder().withDatabase(hiveOptions.getDatabaseName()).withTable(hiveOptions.getTableName()).withStaticPartitionValues(hiveOptions.getStaticPartitionValues()).withHiveConf(hiveOptions.getHiveConf()).withRecordWriter(new HiveRecordWriter(recordReader, getLogger(), i)).withTransactionBatchSize(hiveOptions.getTransactionBatchSize());
        String simpleName = getClass().getSimpleName();
        String identifier = getIdentifier();
        long threadId = Thread.currentThread().threadId();
        Thread.currentThread().getName();
        return withTransactionBatchSize.withAgentInfo("NiFi " + simpleName + " [" + identifier + "] thread " + threadId + "[" + withTransactionBatchSize + "]").connect();
    }

    @OnStopped
    public void cleanup() {
        this.validationResourceHolder.set(null);
        ComponentLog logger = getLogger();
        if (this.callTimeoutPool != null) {
            this.callTimeoutPool.shutdown();
            while (!this.callTimeoutPool.isTerminated()) {
                try {
                    this.callTimeoutPool.awaitTermination(this.callTimeout, TimeUnit.MILLISECONDS);
                } catch (Throwable th) {
                    logger.warn("shutdown interrupted on " + String.valueOf(this.callTimeoutPool), th);
                }
            }
            this.callTimeoutPool = null;
        }
        this.ugi = null;
        this.kerberosUserReference.set(null);
    }

    private void abortAndCloseConnection(StreamingConnection streamingConnection) {
        try {
            abortConnection(streamingConnection);
            closeConnection(streamingConnection);
        } catch (Exception e) {
            getLogger().warn("unable to close hive connections. ", e);
        }
    }

    private void abortConnection(StreamingConnection streamingConnection) {
        if (streamingConnection != null) {
            try {
                streamingConnection.abortTransaction();
            } catch (Exception e) {
                getLogger().error("Failed to abort Hive Streaming transaction " + String.valueOf(streamingConnection) + " due to exception ", e);
            }
        }
    }

    private void closeConnection(StreamingConnection streamingConnection) {
        if (streamingConnection != null) {
            try {
                streamingConnection.close();
            } catch (Exception e) {
                getLogger().error("Failed to close Hive Streaming connection " + String.valueOf(streamingConnection) + " due to exception ", e);
            }
        }
    }

    UserGroupInformation getUgi() {
        getLogger().trace("getting UGI instance");
        if (this.kerberosUserReference.get() != null) {
            KerberosUser kerberosUser = this.kerberosUserReference.get();
            getLogger().debug("kerberosUser is " + String.valueOf(kerberosUser));
            try {
                getLogger().debug("checking TGT on kerberosUser [{}]", new Object[]{kerberosUser});
                kerberosUser.checkTGTAndRelogin();
            } catch (KerberosLoginException e) {
                throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
            }
        } else {
            getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
        }
        return this.ugi;
    }
}
