/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.hive;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.ConnectionError;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.HiveUtils;
import org.apache.nifi.util.hive.HiveWriter;

@TriggerSerially
@Tags(value={"hive", "streaming", "put", "database", "store"})
@CapabilityDescription(value="This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor.")
@WritesAttributes(value={@WritesAttribute(attribute="hivestreaming.record.count", description="This attribute is written on the flow files routed to the 'success' and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively.")})
public class PutHiveStreaming
extends AbstractProcessor {
    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
            return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
        }
        String reason = null;
        try {
            int intVal = Integer.parseInt(value);
            if (intVal < 2) {
                reason = "value is less than 2";
            }
        }
        catch (NumberFormatException e) {
            reason = "value is not a valid integer";
        }
        return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
    };
    private static final Set<String> RESERVED_METADATA;
    public static final PropertyDescriptor METASTORE_URI;
    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES;
    public static final PropertyDescriptor DB_NAME;
    public static final PropertyDescriptor TABLE_NAME;
    public static final PropertyDescriptor PARTITION_COLUMNS;
    public static final PropertyDescriptor AUTOCREATE_PARTITIONS;
    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS;
    public static final PropertyDescriptor HEARTBEAT_INTERVAL;
    public static final PropertyDescriptor TXNS_PER_BATCH;
    public static final Relationship REL_SUCCESS;
    public static final Relationship REL_FAILURE;
    public static final Relationship REL_RETRY;
    private List<PropertyDescriptor> propertyDescriptors;
    private Set<Relationship> relationships;
    private static final long TICKET_RENEWAL_PERIOD = 60000L;
    protected KerberosProperties kerberosProperties;
    private volatile File kerberosConfigFile = null;
    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    protected volatile UserGroupInformation ugi;
    protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
    protected HiveOptions options;
    protected ExecutorService callTimeoutPool;
    protected transient Timer heartBeatTimer;
    protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
    protected Map<HiveEndPoint, HiveWriter> allWriters;

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>();
        props.add(METASTORE_URI);
        props.add(HIVE_CONFIGURATION_RESOURCES);
        props.add(DB_NAME);
        props.add(TABLE_NAME);
        props.add(PARTITION_COLUMNS);
        props.add(AUTOCREATE_PARTITIONS);
        props.add(MAX_OPEN_CONNECTIONS);
        props.add(HEARTBEAT_INTERVAL);
        props.add(TXNS_PER_BATCH);
        this.kerberosConfigFile = context.getKerberosConfigurationFile();
        this.kerberosProperties = new KerberosProperties(this.kerberosConfigFile);
        props.add(this.kerberosProperties.getKerberosPrincipal());
        props.add(this.kerberosProperties.getKerberosKeytab());
        this.propertyDescriptors = Collections.unmodifiableList(props);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        _relationships.add(REL_FAILURE);
        _relationships.add(REL_RETRY);
        this.relationships = Collections.unmodifiableSet(_relationships);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.propertyDescriptors;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @OnScheduled
    public void setup(ProcessContext context) {
        ComponentLog log = this.getLogger();
        String metastoreUri = context.getProperty(METASTORE_URI).getValue();
        String dbName = context.getProperty(DB_NAME).getValue();
        String tableName = context.getProperty(TABLE_NAME).getValue();
        boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
        Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
        Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger();
        Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger();
        String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
        Configuration hiveConfig = this.hiveConfigurator.getConfigurationFromFiles(configFiles);
        for (Map.Entry entry : context.getProperties().entrySet()) {
            PropertyDescriptor descriptor = (PropertyDescriptor)entry.getKey();
            if (!descriptor.isDynamic()) continue;
            hiveConfig.set(descriptor.getName(), (String)entry.getValue());
        }
        this.options = new HiveOptions(metastoreUri, dbName, tableName).withTxnsPerBatch(txnsPerBatch).withAutoCreatePartitions(autoCreatePartitions).withMaxOpenConnections(maxConnections).withHeartBeatInterval(heartbeatInterval);
        this.hiveConfigurator.preload(hiveConfig);
        if (SecurityUtil.isSecurityEnabled((Configuration)hiveConfig)) {
            String principal = context.getProperty(this.kerberosProperties.getKerberosPrincipal()).getValue();
            String keyTab = context.getProperty(this.kerberosProperties.getKerberosKeytab()).getValue();
            log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab});
            try {
                this.ugi = this.hiveConfigurator.authenticate(hiveConfig, principal, keyTab, 60000L, log);
            }
            catch (AuthenticationFailedException ae) {
                throw new ProcessException("Kerberos authentication failed for Hive Streaming", (Throwable)ae);
            }
            log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab});
            this.options = this.options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
        }
        this.allWriters = new ConcurrentHashMap<HiveEndPoint, HiveWriter>();
        String timeoutName = "put-hive-streaming-%d";
        this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
        this.sendHeartBeat.set(true);
        this.heartBeatTimer = new Timer();
        this.setupHeartBeatTimer();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        block21: {
            List partitionColumnList;
            FlowFile flowFile = session.get();
            if (flowFile == null) {
                return;
            }
            ComponentLog log = this.getLogger();
            Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger();
            ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(((Object)((Object)this)).getClass().getClassLoader());
            String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue();
            if (partitionColumns == null || partitionColumns.isEmpty()) {
                partitionColumnList = Collections.emptyList();
            } else {
                String[] partitionCols = partitionColumns.split(",");
                partitionColumnList = new ArrayList(partitionCols.length);
                for (String col : partitionCols) {
                    partitionColumnList.add(col.trim());
                }
            }
            AtomicInteger recordCount = new AtomicInteger(0);
            AtomicInteger successfulRecordCount = new AtomicInteger(0);
            LinkedList successfulRecords = new LinkedList();
            FlowFile inputFlowFile = flowFile;
            AtomicBoolean processingFailure = new AtomicBoolean(false);
            AtomicReference<FlowFile> successFlowFile = new AtomicReference<FlowFile>(session.create(inputFlowFile));
            DataFileWriter successAvroWriter = new DataFileWriter((DatumWriter)new GenericDatumWriter());
            AtomicReference<FlowFile> failureFlowFile = new AtomicReference<FlowFile>(session.create(inputFlowFile));
            DataFileWriter failureAvroWriter = new DataFileWriter((DatumWriter)new GenericDatumWriter());
            try {
                session.read(inputFlowFile, in -> {
                    /*
                     * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
                     * 
                     * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
                     *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
                     *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
                     *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
                     *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
                     *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
                     *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
                     *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
                     *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
                     *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1050)
                     *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
                     *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
                     *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
                     *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
                     *     at org.benf.cfr.reader.Main.main(Main.java:54)
                     */
                    throw new IllegalStateException("Decompilation failed");
                });
                if (recordCount.get() > 0) {
                    if (successfulRecordCount.get() > 0) {
                        successFlowFile.set(session.putAttribute(successFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(recordCount.get())));
                        session.getProvenanceReporter().send(successFlowFile.get(), this.options.getMetaStoreURI());
                        session.transfer(successFlowFile.get(), REL_SUCCESS);
                    } else {
                        session.remove(successFlowFile.get());
                    }
                    if (recordCount.get() != successfulRecordCount.get()) {
                        failureFlowFile.set(session.putAttribute(failureFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(recordCount.get() - successfulRecordCount.get())));
                        session.transfer(failureFlowFile.get(), REL_FAILURE);
                    } else {
                        session.remove(failureFlowFile.get());
                    }
                } else {
                    session.remove(successFlowFile.get());
                    session.remove(failureFlowFile.get());
                }
                successFlowFile.set(null);
                failureFlowFile.set(null);
                if (processingFailure.get()) {
                    session.transfer(inputFlowFile, REL_FAILURE);
                } else {
                    session.remove(flowFile);
                }
            }
            catch (ProcessException pe) {
                this.abortAndCloseWriters();
                Throwable t = pe.getCause();
                if (t != null) {
                    if (t instanceof ConnectionError || t instanceof HiveWriter.ConnectFailure || t instanceof HiveWriter.CommitFailure || t instanceof HiveWriter.TxnBatchFailure || t instanceof HiveWriter.TxnFailure || t instanceof InterruptedException) {
                        log.error("Hive Streaming connect/write error, flow file will be penalized and routed to retry", t);
                        flowFile = session.penalize(flowFile);
                        session.transfer(flowFile, REL_RETRY);
                        if (successFlowFile.get() != null) {
                            session.remove(successFlowFile.get());
                        }
                        if (failureFlowFile.get() != null) {
                            session.remove(failureFlowFile.get());
                        }
                        break block21;
                    }
                    throw pe;
                }
                throw pe;
            }
            finally {
                Thread.currentThread().setContextClassLoader(originalClassloader);
            }
        }
    }

    private void appendRecordsToFlowFile(ProcessSession session, List<HiveStreamingRecord> records, AtomicReference<FlowFile> appendFlowFile, DataFileWriter<GenericRecord> avroWriter, DataFileStream<GenericRecord> reader) throws IOException {
        appendFlowFile.set(session.append(appendFlowFile.get(), out -> {
            try (DataFileWriter writer = avroWriter.create(reader.getSchema(), out);){
                for (HiveStreamingRecord sRecord : records) {
                    writer.append((Object)sRecord.getRecord());
                }
                writer.flush();
            }
        }));
    }

    @OnStopped
    public void cleanup() {
        ComponentLog log = this.getLogger();
        this.sendHeartBeat.set(false);
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            try {
                HiveWriter w = entry.getValue();
                w.flushAndClose();
            }
            catch (Exception ex) {
                log.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", (Throwable)ex);
                if (!(ex instanceof InterruptedException)) continue;
                Thread.currentThread().interrupt();
            }
        }
        this.callTimeoutPool.shutdown();
        try {
            while (!this.callTimeoutPool.isTerminated()) {
                this.callTimeoutPool.awaitTermination(this.options.getCallTimeOut().intValue(), TimeUnit.MILLISECONDS);
            }
        }
        catch (Throwable t) {
            log.warn("shutdown interrupted on " + this.callTimeoutPool, t);
        }
        this.callTimeoutPool = null;
    }

    private void setupHeartBeatTimer() {
        if (this.options.getHeartBeatInterval() > 0) {
            final ComponentLog log = this.getLogger();
            this.heartBeatTimer.schedule(new TimerTask(){

                @Override
                public void run() {
                    try {
                        if (PutHiveStreaming.this.sendHeartBeat.get()) {
                            log.debug("Start sending heartbeat on all writers");
                            PutHiveStreaming.this.sendHeartBeatOnAllWriters();
                            PutHiveStreaming.this.setupHeartBeatTimer();
                        }
                    }
                    catch (Exception e) {
                        log.warn("Failed to heartbeat on HiveWriter ", (Throwable)e);
                    }
                }
            }, this.options.getHeartBeatInterval() * 1000);
        }
    }

    private void sendHeartBeatOnAllWriters() throws InterruptedException {
        for (HiveWriter writer : this.allWriters.values()) {
            writer.heartBeat();
        }
    }

    private void flushAllWriters(boolean rollToNext) throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
        for (HiveWriter writer : this.allWriters.values()) {
            writer.flush(rollToNext);
        }
    }

    private void abortAndCloseWriters() {
        try {
            this.abortAllWriters();
            this.closeAllWriters();
        }
        catch (Exception ie) {
            this.getLogger().warn("unable to close hive connections. ", (Throwable)ie);
        }
    }

    private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            try {
                entry.getValue().abort();
            }
            catch (Exception e) {
                this.getLogger().error("Failed to abort hive transaction batch, HiveEndPoint " + entry.getValue() + " due to exception ", (Throwable)e);
            }
        }
    }

    private void closeAllWriters() {
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            try {
                entry.getValue().close();
            }
            catch (Exception e) {
                this.getLogger().warn("unable to close writers. ", (Throwable)e);
            }
        }
        this.allWriters.clear();
    }

    private HiveWriter getOrCreateWriter(HiveEndPoint endPoint) throws HiveWriter.ConnectFailure, InterruptedException {
        ComponentLog log = this.getLogger();
        try {
            HiveWriter writer = this.allWriters.get(endPoint);
            if (writer == null) {
                log.debug("Creating Writer to Hive end point : " + endPoint);
                writer = this.makeHiveWriter(endPoint, this.callTimeoutPool, this.ugi, this.options);
                if (this.allWriters.size() > this.options.getMaxOpenConnections() - 1) {
                    log.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", new Object[]{this.allWriters.size(), this.options.getMaxOpenConnections()});
                    int retired = this.retireIdleWriters();
                    if (retired == 0) {
                        this.retireEldestWriter();
                    }
                }
                this.allWriters.put(endPoint, writer);
                HiveUtils.logAllHiveEndPoints(this.allWriters);
            }
            return writer;
        }
        catch (HiveWriter.ConnectFailure e) {
            log.error("Failed to create HiveWriter for endpoint: " + endPoint, (Throwable)e);
            throw e;
        }
    }

    private void retireEldestWriter() {
        ComponentLog log = this.getLogger();
        log.info("Attempting close eldest writers");
        long oldestTimeStamp = System.currentTimeMillis();
        HiveEndPoint eldest = null;
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (entry.getValue().getLastUsed() >= oldestTimeStamp) continue;
            eldest = entry.getKey();
            oldestTimeStamp = entry.getValue().getLastUsed();
        }
        try {
            log.info("Closing least used Writer to Hive end point : " + eldest);
            this.allWriters.remove(eldest).flushAndClose();
        }
        catch (IOException e) {
            log.warn("Failed to close writer for end point: " + eldest, (Throwable)e);
        }
        catch (InterruptedException e) {
            log.warn("Interrupted when attempting to close writer for end point: " + eldest, (Throwable)e);
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            log.warn("Interrupted when attempting to close writer for end point: " + eldest, (Throwable)e);
        }
    }

    private int retireIdleWriters() {
        ComponentLog log = this.getLogger();
        log.info("Attempting to close idle HiveWriters");
        int count = 0;
        long now = System.currentTimeMillis();
        ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>();
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (now - entry.getValue().getLastUsed() <= (long)this.options.getIdleTimeout().intValue()) continue;
            ++count;
            retirees.add(entry.getKey());
        }
        for (HiveEndPoint ep : retirees) {
            try {
                log.info("Closing idle Writer to Hive end point : {}", new Object[]{ep});
                this.allWriters.remove(ep).flushAndClose();
            }
            catch (IOException e) {
                log.warn("Failed to close HiveWriter for end point: {}. Error: " + ep, (Throwable)e);
            }
            catch (InterruptedException e) {
                log.warn("Interrupted when attempting to close HiveWriter for end point: " + ep, (Throwable)e);
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                log.warn("Interrupted when attempting to close HiveWriter for end point: " + ep, (Throwable)e);
            }
        }
        return count;
    }

    protected HiveEndPoint makeHiveEndPoint(List<String> partitionValues, HiveOptions options) throws ConnectionError {
        return HiveUtils.makeEndPoint(partitionValues, options);
    }

    protected HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options) throws HiveWriter.ConnectFailure, InterruptedException {
        return HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
    }

    protected KerberosProperties getKerberosProperties() {
        return this.kerberosProperties;
    }

    private static /* synthetic */ int lambda$null$3(int current, int incr) {
        return current + incr;
    }

    private static /* synthetic */ int lambda$null$2(int current, int incr) {
        return current + incr;
    }

    private static /* synthetic */ void lambda$null$1(String codec, DataFileStream reader, DataFileWriter writer) {
        writer.setCodec(CodecFactory.fromString((String)codec));
        for (String metaKey : reader.getMetaKeys()) {
            if (RESERVED_METADATA.contains(metaKey)) continue;
            writer.setMeta(metaKey, reader.getMeta(metaKey));
        }
    }

    static {
        HashSet<String> reservedMetadata = new HashSet<String>();
        reservedMetadata.add("avro.schema");
        reservedMetadata.add("avro.codec");
        RESERVED_METADATA = Collections.unmodifiableSet(reservedMetadata);
        METASTORE_URI = new PropertyDescriptor.Builder().name("hive-stream-metastore-uri").displayName("Hive Metastore URI").description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the Hive metastore is 9043.").required(true).addValidator(StandardValidators.URI_VALIDATOR).addValidator(StandardValidators.createRegexMatchingValidator((Pattern)Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))).build();
        HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder().name("hive-config-resources").displayName("Hive Configuration Resources").description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.").required(false).addValidator(HiveUtils.createMultipleFilesExistValidator()).build();
        DB_NAME = new PropertyDescriptor.Builder().name("hive-stream-database-name").displayName("Database Name").description("The name of the database in which to put the data.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
        TABLE_NAME = new PropertyDescriptor.Builder().name("hive-stream-table-name").displayName("Table Name").description("The name of the database table in which to put the data.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
        PARTITION_COLUMNS = new PropertyDescriptor.Builder().name("hive-stream-partition-cols").displayName("Partition Columns").description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must correspond exactly to the order of partition columns specified during the table creation.").required(false).expressionLanguageSupported(false).addValidator(StandardValidators.createRegexMatchingValidator((Pattern)Pattern.compile("[^,]+(,[^,]+)*"))).build();
        AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder().name("hive-stream-autocreate-partition").displayName("Auto-Create Partitions").description("Flag indicating whether partitions should be automatically created").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
        MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder().name("hive-stream-max-open-connections").displayName("Max Open Connections").description("The maximum number of open connections that can be allocated from this pool at the same time, or negative for no limit.").defaultValue("8").required(true).addValidator(StandardValidators.INTEGER_VALIDATOR).sensitive(false).build();
        HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder().name("hive-stream-heartbeat-interval").displayName("Heartbeat Interval").description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. A value of 0 indicates that no heartbeat should be sent.").defaultValue("60").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).sensitive(false).build();
        TXNS_PER_BATCH = new PropertyDescriptor.Builder().name("hive-stream-transactions-per-batch").displayName("Transactions per Batch").description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.").required(true).expressionLanguageSupported(true).addValidator(GREATER_THAN_ONE_VALIDATOR).defaultValue("100").build();
        REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile containing the JSON contents of a record is routed to this relationship after the record has been successfully transmitted to Hive.").build();
        REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile containing the JSON contents of a record is routed to this relationship if the record could not be transmitted to Hive.").build();
        REL_RETRY = new Relationship.Builder().name("retry").description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that some records may have been processed successfully, they will be routed (as JSON flow files) to the success relationship. The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This can be used to provide a retry capability since full rollback is not possible.").build();
    }

    protected class HiveStreamingRecord {
        private List<String> partitionValues;
        private GenericRecord record;

        public HiveStreamingRecord(List<String> partitionValues, GenericRecord record) {
            this.partitionValues = partitionValues;
            this.record = record;
        }

        public List<String> getPartitionValues() {
            return this.partitionValues;
        }

        public GenericRecord getRecord() {
            return this.record;
        }
    }
}

