package org.apache.plc4x.nifi;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.util.StopWatch;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.nifi.record.RecordPlc4xWriter;

@CapabilityDescription("Processor able to read data from industrial PLCs using Apache PLC4X")
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({"plc4x-source"})
@WritesAttributes({@WritesAttribute(attribute = "value", description = "some value")})
/* loaded from: input_file:org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.class */
public class Plc4xSourceRecordProcessor extends BasePlc4xProcessor {
    public static final String RESULT_ROW_COUNT = "plc4x.read.row.count";
    public static final String RESULT_QUERY_DURATION = "plc4x.read.query.duration";
    public static final String RESULT_QUERY_EXECUTION_TIME = "plc4x.read.query.executiontime";
    public static final String RESULT_QUERY_FETCH_TIME = "plc4x.read.query.fetchtime";
    public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
    public static final String RESULT_ERROR_MESSAGE = "plc4x.read.error.message";
    public static final PropertyDescriptor PLC_RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder().name("plc4x-record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    public static final PropertyDescriptor PLC_READ_FUTURE_TIMEOUT_MILISECONDS = new PropertyDescriptor.Builder().name("plc4x-record-read-timeout").displayName("Read timeout (miliseconds)").description("Read timeout in miliseconds").defaultValue("10000").required(true).addValidator(StandardValidators.INTEGER_VALIDATOR).build();
    Integer readTimeout;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.plc4x.nifi.BasePlc4xProcessor
    public void init(ProcessorInitializationContext processorInitializationContext) {
        super.init(processorInitializationContext);
        HashSet hashSet = new HashSet();
        hashSet.addAll(super.getRelationships());
        this.relationships = Collections.unmodifiableSet(hashSet);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(super.getSupportedPropertyDescriptors());
        arrayList.add(PLC_RECORD_WRITER_FACTORY);
        arrayList.add(PLC_READ_FUTURE_TIMEOUT_MILISECONDS);
        this.properties = Collections.unmodifiableList(arrayList);
    }

    @Override // org.apache.plc4x.nifi.BasePlc4xProcessor
    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        this.connectionString = processContext.getProperty(PLC_CONNECTION_STRING.getName()).getValue();
        this.readTimeout = processContext.getProperty(PLC_READ_FUTURE_TIMEOUT_MILISECONDS.getName()).asInteger();
        this.addressMap = new HashMap();
        processContext.getProperties().keySet().stream().filter((v0) -> {
            return v0.isDynamic();
        }).forEach(propertyDescriptor -> {
            this.addressMap.put(propertyDescriptor.getName(), processContext.getProperty(propertyDescriptor.getName()).getValue());
        });
        if (this.addressMap.isEmpty()) {
            throw new PlcRuntimeException("No address specified");
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        String attribute;
        FlowFile flowFile = null;
        if (processContext.hasIncomingConnection()) {
            flowFile = processSession.get();
            if (flowFile == null && processContext.hasNonLoopConnection()) {
                return;
            }
        }
        RecordPlc4xWriter recordPlc4xWriter = new RecordPlc4xWriter(processContext.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class), flowFile == null ? Collections.emptyMap() : flowFile.getAttributes());
        ComponentLog logger = getLogger();
        AtomicLong atomicLong = new AtomicLong(0L);
        StopWatch stopWatch = new StopWatch(true);
        try {
            PlcConnection connection = getDriverManager().getConnection(getConnectionString());
            if (flowFile == null) {
                attribute = null;
            } else {
                try {
                    attribute = flowFile.getAttribute(CoreAttributes.UUID.key());
                } finally {
                }
            }
            String str = attribute;
            Map attributes = flowFile == null ? null : flowFile.getAttributes();
            FlowFile create = flowFile == null ? processSession.create() : processSession.create(flowFile);
            if (attributes != null) {
                create = processSession.putAllAttributes(create, attributes);
            }
            PlcReadRequest.Builder readRequestBuilder = connection.readRequestBuilder();
            getFields().forEach(str2 -> {
                String address = getAddress(str2);
                if (address != null) {
                    readRequestBuilder.addItem(str2, address);
                }
            });
            PlcReadRequest build = readRequestBuilder.build();
            FlowFile flowFile2 = flowFile;
            FlowFile write = processSession.write(create, outputStream -> {
                try {
                    PlcReadResponse plcReadResponse = (PlcReadResponse) build.execute().get(this.readTimeout.intValue(), TimeUnit.MILLISECONDS);
                    if (flowFile2 == null) {
                        atomicLong.set(recordPlc4xWriter.writePlcReadResponse(plcReadResponse, outputStream, logger, null));
                    } else {
                        atomicLong.set(recordPlc4xWriter.writePlcReadResponse(plcReadResponse, outputStream, logger, null, flowFile2));
                    }
                } catch (InterruptedException e) {
                    logger.error("InterruptedException reading the data from PLC", e);
                    Thread.currentThread().interrupt();
                    throw new ProcessException(e);
                } catch (TimeoutException e2) {
                    logger.error("Timeout reading the data from PLC", e2);
                    throw new ProcessException(e2);
                } catch (Exception e3) {
                    logger.error("Exception reading the data from PLC", e3);
                    if (!(e3 instanceof ProcessException)) {
                        throw new ProcessException(e3);
                    }
                }
            });
            long elapsed = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
            HashMap hashMap = new HashMap();
            hashMap.put(RESULT_ROW_COUNT, String.valueOf(atomicLong.get()));
            hashMap.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(elapsed));
            if (str != null) {
                hashMap.put(INPUT_FLOWFILE_UUID, str);
            }
            hashMap.putAll(recordPlc4xWriter.getAttributesToAdd());
            FlowFile putAllAttributes = processSession.putAllAttributes(write, hashMap);
            recordPlc4xWriter.updateCounters(processSession);
            logger.info("{} contains {} records; transferring to 'success'", new Object[]{putAllAttributes, Long.valueOf(atomicLong.get())});
            if (processContext.hasIncomingConnection()) {
                processSession.getProvenanceReporter().fetch(putAllAttributes, "Retrieved " + atomicLong.get() + " rows", elapsed);
            } else {
                processSession.getProvenanceReporter().receive(putAllAttributes, "Retrieved " + atomicLong.get() + " rows", elapsed);
            }
            processSession.transfer(putAllAttributes, BasePlc4xProcessor.REL_SUCCESS);
            if (flowFile != null) {
                processSession.remove(flowFile);
            }
            processSession.commitAsync();
            if (connection != null) {
                connection.close();
            }
        } catch (PlcConnectionException e) {
            logger.error("Error getting the PLC connection", e);
            throw new ProcessException("Got an a PlcConnectionException while trying to get a connection", e);
        } catch (Exception e2) {
            logger.error("Got an error while trying to get a connection", e2);
            throw new ProcessException("Got an error while trying to get a connection", e2);
        }
    }
}
