package org.apache.nifi.accumulo.processors;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;
import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
import org.apache.nifi.accumulo.data.KeySchema;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;

@CapabilityDescription("Scan the given table and writes result in a flowfile. Value will be represented as UTF-8 Encoded String.")
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@SupportsBatching
@Tags({"hadoop", "accumulo", "scan", "record"})
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
/* loaded from: input_file:org/apache/nifi/accumulo/processors/ScanAccumulo.class */
public class ScanAccumulo extends BaseAccumuloProcessor {
    static final PropertyDescriptor START_KEY = new PropertyDescriptor.Builder().displayName("Start key").name("start-key").description("Start row key").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(Validator.VALID).build();
    static final PropertyDescriptor START_KEY_INCLUSIVE = new PropertyDescriptor.Builder().displayName("Start key Inclusive").name("start-key-inclusive").description("Determines if the start key is inclusive ").required(false).defaultValue("True").allowableValues(new String[]{"True", "False"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    static final PropertyDescriptor END_KEY = new PropertyDescriptor.Builder().displayName("End key").name("end-key").description("End row key for this. If not specified or empty this will be infinite").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(Validator.VALID).build();
    static final PropertyDescriptor END_KEY_INCLUSIVE = new PropertyDescriptor.Builder().displayName("End key Inclusive").name("end-key-inclusive").description("Determines if the end key is inclusive").required(false).defaultValue("False").allowableValues(new String[]{"True", "False"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    static final PropertyDescriptor AUTHORIZATIONS = new PropertyDescriptor.Builder().name("accumulo-authorizations").displayName("Authorizations").description("The comma separated list of authorizations to pass to the scanner.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(Validator.VALID).build();
    static final PropertyDescriptor COLUMNFAMILY = new PropertyDescriptor.Builder().name("column-family").displayName("Start Column Family").description("The column family that is part of the start key. If no column key is defined only this column family will be selected").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(Validator.VALID).build();
    static final PropertyDescriptor COLUMNFAMILY_END = new PropertyDescriptor.Builder().name("column-family-end").displayName("End Column Family").description("The column family to select is part of end key").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(Validator.VALID).build();
    static final PropertyDescriptor VALUE_INCLUDED_IN_RESULT = new PropertyDescriptor.Builder().displayName("Value Included in Result").name("accumulo-value-inclusive").description("Beside keys and their values, accumulo value field will also be included in the result as UTF-8 Encoded String.").required(false).defaultValue("True").allowableValues(new String[]{"True", "False"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile is routed to this relationship after it has been successfully retrieved from Accumulo").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile is routed to this relationship if it cannot be retrieved fromAccumulo").build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    protected BaseAccumuloService accumuloConnectorService;
    protected AccumuloClient client;

    public Set<Relationship> getRelationships() {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        return hashSet;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList();
        if ((validationContext.getProperty(COLUMNFAMILY).isSet() && !validationContext.getProperty(COLUMNFAMILY_END).isSet()) || (!validationContext.getProperty(COLUMNFAMILY).isSet() && validationContext.getProperty(COLUMNFAMILY_END).isSet())) {
            arrayList.add(new ValidationResult.Builder().explanation("Column Family and Column family end  must be defined").build());
        }
        return arrayList;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        this.accumuloConnectorService = processContext.getProperty(ACCUMULO_CONNECTOR_SERVICE).asControllerService(BaseAccumuloService.class);
        this.client = this.accumuloConnectorService.getClient();
    }

    private Authorizations stringToAuth(String str) {
        return !StringUtils.isBlank(str) ? new Authorizations(str.split(",")) : new Authorizations();
    }

    protected long scanAccumulo(final RecordSetWriterFactory recordSetWriterFactory, ProcessContext processContext, ProcessSession processSession, final Optional<FlowFile> optional) {
        final Map attributes = optional.isPresent() ? optional.get().getAttributes() : new HashMap();
        String value = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(attributes).getValue();
        String value2 = processContext.getProperty(START_KEY).evaluateAttributeExpressions(attributes).getValue();
        boolean booleanValue = processContext.getProperty(START_KEY_INCLUSIVE).asBoolean().booleanValue();
        boolean booleanValue2 = processContext.getProperty(END_KEY_INCLUSIVE).asBoolean().booleanValue();
        String value3 = processContext.getProperty(END_KEY).evaluateAttributeExpressions(attributes).getValue();
        String value4 = processContext.getProperty(AUTHORIZATIONS).isSet() ? processContext.getProperty(AUTHORIZATIONS).evaluateAttributeExpressions(attributes).getValue() : "";
        int intValue = processContext.getProperty(THREADS).asInteger().intValue();
        String value5 = processContext.getProperty(COLUMNFAMILY).evaluateAttributeExpressions(attributes).getValue();
        String value6 = processContext.getProperty(COLUMNFAMILY_END).evaluateAttributeExpressions(attributes).getValue();
        final boolean booleanValue3 = processContext.getProperty(VALUE_INCLUDED_IN_RESULT).asBoolean().booleanValue();
        Authorizations stringToAuth = stringToAuth(value4);
        final LongAdder longAdder = new LongAdder();
        Range buildRange = buildRange(value2, value5, booleanValue, value3, value6, booleanValue2);
        boolean isPresent = optional.isPresent();
        this.accumuloConnectorService.renewTgtIfNecessary();
        try {
            BatchScanner createBatchScanner = this.client.createBatchScanner(value, stringToAuth, intValue);
            try {
                if (!StringUtils.isBlank(value5) && StringUtils.isBlank(value6)) {
                    createBatchScanner.fetchColumnFamily(new Text(value5));
                }
                createBatchScanner.setRanges(Collections.singleton(buildRange));
                createBatchScanner.setTimeout(processContext.getProperty(ACCUMULO_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).longValue(), TimeUnit.SECONDS);
                final Iterator it = createBatchScanner.iterator();
                if (!it.hasNext()) {
                    HashMap hashMap = new HashMap();
                    hashMap.put("record.count", String.valueOf(0));
                    FlowFile create = processSession.create();
                    processSession.putAllAttributes(create, hashMap);
                    processSession.transfer(create, REL_SUCCESS);
                    if (createBatchScanner != null) {
                        createBatchScanner.close();
                    }
                    return 0L;
                }
                while (it.hasNext()) {
                    FlowFile clone = isPresent ? processSession.clone(optional.get()) : processSession.create();
                    final HashMap hashMap2 = new HashMap();
                    FlowFile write = processSession.write(clone, new StreamCallback() { // from class: org.apache.nifi.accumulo.processors.ScanAccumulo.1
                        public void process(InputStream inputStream, OutputStream outputStream) throws IOException {
                            try {
                                RecordSchema determineRecordSchema = ScanAccumulo.this.determineRecordSchema(recordSetWriterFactory, attributes, booleanValue3);
                                RecordSetWriter createWriter = recordSetWriterFactory.createWriter(ScanAccumulo.this.getLogger(), determineRecordSchema, outputStream, Collections.emptyMap());
                                try {
                                    int i = 0;
                                    createWriter.beginRecordSet();
                                    while (i < 1000 && it.hasNext()) {
                                        Map.Entry entry = (Map.Entry) it.next();
                                        Key key = (Key) entry.getKey();
                                        HashMap hashMap3 = new HashMap();
                                        hashMap3.put("row", key.getRow().toString());
                                        hashMap3.put("columnFamily", key.getColumnFamily().toString());
                                        hashMap3.put("columnQualifier", key.getColumnQualifier().toString());
                                        hashMap3.put("columnVisibility", key.getColumnVisibility().toString());
                                        hashMap3.put("timestamp", Long.valueOf(key.getTimestamp()));
                                        if (booleanValue3) {
                                            hashMap3.put("value", Objects.isNull(entry.getValue()) ? null : ((Value) entry.getValue()).toString());
                                        }
                                        createWriter.write(new MapRecord(determineRecordSchema, hashMap3));
                                        i++;
                                    }
                                    longAdder.add(i);
                                    WriteResult finishRecordSet = createWriter.finishRecordSet();
                                    hashMap2.put("record.count", String.valueOf(i));
                                    hashMap2.put(CoreAttributes.MIME_TYPE.key(), createWriter.getMimeType());
                                    hashMap2.putAll(finishRecordSet.getAttributes());
                                    if (createWriter != null) {
                                        createWriter.close();
                                    }
                                } finally {
                                }
                            } catch (SchemaNotFoundException e) {
                                ComponentLog logger = ScanAccumulo.this.getLogger();
                                Object[] objArr = new Object[2];
                                objArr[0] = optional.isPresent() ? optional.get() : "No incoming flow file";
                                objArr[1] = e;
                                logger.error("Failed to process {}; will route to failure", objArr);
                                throw new IOException((Throwable) e);
                            }
                        }
                    });
                    processSession.putAllAttributes(write, hashMap2);
                    processSession.transfer(write, REL_SUCCESS);
                }
                if (createBatchScanner != null) {
                    createBatchScanner.close();
                }
                if (isPresent) {
                    processSession.remove(optional.get());
                }
                getLogger().info("Successfully converted {} records for {}", new Object[]{Long.valueOf(longAdder.longValue()), optional.toString()});
                return longAdder.longValue();
            } finally {
            }
        } catch (Exception e) {
            ComponentLog logger = getLogger();
            Object[] objArr = new Object[2];
            objArr[0] = optional.isPresent() ? optional.get() : "No incoming flow file";
            objArr[1] = e;
            logger.error("Failed to process {}; will route to failure", objArr);
            if (!isPresent) {
                return 0L;
            }
            processSession.transfer(optional.get(), REL_FAILURE);
            return 0L;
        }
    }

    private RecordSchema determineRecordSchema(RecordSetWriterFactory recordSetWriterFactory, Map<String, String> map, boolean z) throws SchemaNotFoundException, IOException {
        RecordSchema schema = recordSetWriterFactory.getSchema(map, new KeySchema());
        if (!z) {
            return schema;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(schema.getFields());
        arrayList.add(new RecordField("value", RecordFieldType.STRING.getDataType()));
        return new SimpleRecordSchema(arrayList);
    }

    Range buildRange(String str, String str2, boolean z, String str3, String str4, boolean z2) {
        return new Range(StringUtils.isBlank(str) ? null : StringUtils.isBlank(str2) ? new Key(str) : new Key(str, str2), z, StringUtils.isBlank(str3) ? null : StringUtils.isBlank(str4) ? new Key(str3) : new Key(str3, str4), z2);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        processSession.adjustCounter("Records Processed", scanAccumulo((RecordSetWriterFactory) processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class), processContext, processSession, Optional.ofNullable(processSession.get())), false);
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList(baseProperties);
        arrayList.add(START_KEY);
        arrayList.add(START_KEY_INCLUSIVE);
        arrayList.add(END_KEY);
        arrayList.add(COLUMNFAMILY);
        arrayList.add(COLUMNFAMILY_END);
        arrayList.add(END_KEY_INCLUSIVE);
        arrayList.add(VALUE_INCLUDED_IN_RESULT);
        arrayList.add(RECORD_WRITER);
        arrayList.add(AUTHORIZATIONS);
        return arrayList;
    }
}
