package org.apache.nifi.reporting;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;

@CapabilityDescription("Publishes Status events using the Site To Site protocol.  The component type and name filter regexes form a union: only components matching both regexes will be reported.  However, all process groups are recursively searched for matching components, regardless of whether the process group matches the component filters.")
@Tags({"status", "metrics", "history", "site", "site to site"})
/* loaded from: input_file:org/apache/nifi/reporting/SiteToSiteStatusReportingTask.class */
public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTask {
    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder().name("Platform").description("The value to use for the platform field in each provenance event.").required(true).expressionLanguageSupported(true).defaultValue("nifi").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor COMPONENT_TYPE_FILTER_REGEX = new PropertyDescriptor.Builder().name("Component Type Filter Regex").description("A regex specifying which component types to report.  Any component type matching this regex will be included.  Component types are: Processor, RootProcessGroup, ProcessGroup, RemoteProcessGroup, Connection, InputPort, OutputPort").required(true).expressionLanguageSupported(true).defaultValue("(Processor|ProcessGroup|RemoteProcessGroup|RootProcessGroup|Connection|InputPort|OutputPort)").addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)).build();
    static final PropertyDescriptor COMPONENT_NAME_FILTER_REGEX = new PropertyDescriptor.Builder().name("Component Name Filter Regex").description("A regex specifying which component names to report.  Any component name matching this regex will be included.").required(true).expressionLanguageSupported(true).defaultValue(".*").addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)).build();
    private volatile Pattern componentTypeFilter;
    private volatile Pattern componentNameFilter;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.nifi.reporting.AbstractSiteToSiteReportingTask
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList(super.getSupportedPropertyDescriptors());
        arrayList.add(PLATFORM);
        arrayList.add(COMPONENT_TYPE_FILTER_REGEX);
        arrayList.add(COMPONENT_NAME_FILTER_REGEX);
        return arrayList;
    }

    public void onTrigger(ReportingContext reportingContext) {
        boolean isClustered = reportingContext.isClustered();
        if (reportingContext.getClusterNodeIdentifier() == null && isClustered) {
            getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. Will wait for Node Identifier to be established.");
            return;
        }
        this.componentTypeFilter = Pattern.compile(reportingContext.getProperty(COMPONENT_TYPE_FILTER_REGEX).evaluateAttributeExpressions().getValue());
        this.componentNameFilter = Pattern.compile(reportingContext.getProperty(COMPONENT_NAME_FILTER_REGEX).evaluateAttributeExpressions().getValue());
        ProcessGroupStatus controllerStatus = reportingContext.getEventAccess().getControllerStatus();
        String name = controllerStatus == null ? null : controllerStatus.getName();
        try {
            String host = new URL(reportingContext.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue()).getHost();
            String value = reportingContext.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
            JsonBuilderFactory createBuilderFactory = Json.createBuilderFactory(Collections.emptyMap());
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat(TIMESTAMP_FORMAT);
            simpleDateFormat.setTimeZone(TimeZone.getTimeZone("Z"));
            JsonArrayBuilder createArrayBuilder = createBuilderFactory.createArrayBuilder();
            serializeProcessGroupStatus(createArrayBuilder, createBuilderFactory, controllerStatus, simpleDateFormat, host, name, value, null, new Date());
            JsonArray build = createArrayBuilder.build();
            int intValue = reportingContext.getProperty(BATCH_SIZE).asInteger().intValue();
            int min = Math.min(intValue, build.size());
            List subList = build.subList(0, min);
            while (!subList.isEmpty()) {
                try {
                    long nanoTime = System.nanoTime();
                    Transaction createTransaction = getClient().createTransaction(TransferDirection.SEND);
                    if (createTransaction == null) {
                        getLogger().debug("All destination nodes are penalized; will attempt to send data later");
                        return;
                    }
                    HashMap hashMap = new HashMap();
                    String uuid = UUID.randomUUID().toString();
                    hashMap.put("reporting.task.transaction.id", uuid);
                    hashMap.put("mime.type", "application/json");
                    JsonArrayBuilder createArrayBuilder2 = createBuilderFactory.createArrayBuilder();
                    Iterator it = subList.iterator();
                    while (it.hasNext()) {
                        createArrayBuilder2.add((JsonValue) it.next());
                    }
                    createTransaction.send(createArrayBuilder2.build().toString().getBytes(StandardCharsets.UTF_8), hashMap);
                    createTransaction.confirm();
                    createTransaction.complete();
                    getLogger().info("Successfully sent {} Status Records to destination in {} ms; Transaction ID = {}", new Object[]{Integer.valueOf(build.size()), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)), uuid});
                    int i = min;
                    min = Math.min(i + intValue, build.size());
                    subList = build.subList(i, min);
                } catch (IOException e) {
                    throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e);
                }
            }
        } catch (MalformedURLException e2) {
            throw new AssertionError();
        }
    }

    boolean componentMatchesFilters(String str, String str2) {
        return this.componentTypeFilter.matcher(str).matches() && this.componentNameFilter.matcher(str2).matches();
    }

    void serializeProcessGroupStatus(JsonArrayBuilder jsonArrayBuilder, JsonBuilderFactory jsonBuilderFactory, ProcessGroupStatus processGroupStatus, DateFormat dateFormat, String str, String str2, String str3, String str4, Date date) {
        JsonObjectBuilder createObjectBuilder = jsonBuilderFactory.createObjectBuilder();
        String str5 = str4 == null ? "RootProcessGroup" : "ProcessGroup";
        String name = processGroupStatus.getName();
        if (componentMatchesFilters(str5, name)) {
            addCommonFields(createObjectBuilder, dateFormat, str, str2, str3, str4, date, str5, name);
            addField(createObjectBuilder, "componentId", processGroupStatus.getId());
            addField(createObjectBuilder, "bytesRead", processGroupStatus.getBytesRead());
            addField(createObjectBuilder, "bytesWritten", processGroupStatus.getBytesWritten());
            addField(createObjectBuilder, "bytesReceived", Long.valueOf(processGroupStatus.getBytesReceived()));
            addField(createObjectBuilder, "bytesSent", Long.valueOf(processGroupStatus.getBytesSent()));
            addField(createObjectBuilder, "bytesTransferred", Long.valueOf(processGroupStatus.getBytesTransferred()));
            addField(createObjectBuilder, "flowFilesReceived", Integer.valueOf(processGroupStatus.getFlowFilesReceived()));
            addField(createObjectBuilder, "flowFilesSent", Integer.valueOf(processGroupStatus.getFlowFilesSent()));
            addField(createObjectBuilder, "flowFilesTransferred", Integer.valueOf(processGroupStatus.getFlowFilesTransferred()));
            addField(createObjectBuilder, "inputContentSize", processGroupStatus.getInputContentSize());
            addField(createObjectBuilder, "inputCount", processGroupStatus.getInputCount());
            addField(createObjectBuilder, "outputContentSize", processGroupStatus.getOutputContentSize());
            addField(createObjectBuilder, "outputCount", processGroupStatus.getOutputCount());
            addField(createObjectBuilder, "queuedContentSize", processGroupStatus.getQueuedContentSize());
            addField(createObjectBuilder, "activeThreadCount", processGroupStatus.getActiveThreadCount());
            addField(createObjectBuilder, "queuedCount", processGroupStatus.getQueuedCount());
            jsonArrayBuilder.add(createObjectBuilder.build());
        }
        Iterator it = processGroupStatus.getProcessGroupStatus().iterator();
        while (it.hasNext()) {
            serializeProcessGroupStatus(jsonArrayBuilder, jsonBuilderFactory, (ProcessGroupStatus) it.next(), dateFormat, str, str2, str3, processGroupStatus.getId(), date);
        }
        Iterator it2 = processGroupStatus.getProcessorStatus().iterator();
        while (it2.hasNext()) {
            serializeProcessorStatus(jsonArrayBuilder, jsonBuilderFactory, (ProcessorStatus) it2.next(), dateFormat, str, str2, str3, processGroupStatus.getId(), date);
        }
        Iterator it3 = processGroupStatus.getConnectionStatus().iterator();
        while (it3.hasNext()) {
            serializeConnectionStatus(jsonArrayBuilder, jsonBuilderFactory, (ConnectionStatus) it3.next(), dateFormat, str, str2, str3, processGroupStatus.getId(), date);
        }
        Iterator it4 = processGroupStatus.getInputPortStatus().iterator();
        while (it4.hasNext()) {
            serializePortStatus("InputPort", jsonArrayBuilder, jsonBuilderFactory, (PortStatus) it4.next(), dateFormat, str, str2, str3, processGroupStatus.getId(), date);
        }
        Iterator it5 = processGroupStatus.getOutputPortStatus().iterator();
        while (it5.hasNext()) {
            serializePortStatus("OutputPort", jsonArrayBuilder, jsonBuilderFactory, (PortStatus) it5.next(), dateFormat, str, str2, str3, processGroupStatus.getId(), date);
        }
        Iterator it6 = processGroupStatus.getRemoteProcessGroupStatus().iterator();
        while (it6.hasNext()) {
            serializeRemoteProcessGroupStatus(jsonArrayBuilder, jsonBuilderFactory, (RemoteProcessGroupStatus) it6.next(), dateFormat, str, str2, str3, processGroupStatus.getId(), date);
        }
    }

    void serializeRemoteProcessGroupStatus(JsonArrayBuilder jsonArrayBuilder, JsonBuilderFactory jsonBuilderFactory, RemoteProcessGroupStatus remoteProcessGroupStatus, DateFormat dateFormat, String str, String str2, String str3, String str4, Date date) {
        JsonObjectBuilder createObjectBuilder = jsonBuilderFactory.createObjectBuilder();
        String name = remoteProcessGroupStatus.getName();
        if (componentMatchesFilters("RemoteProcessGroup", name)) {
            addCommonFields(createObjectBuilder, dateFormat, str, str2, str3, str4, date, "RemoteProcessGroup", name);
            addField(createObjectBuilder, "componentId", remoteProcessGroupStatus.getId());
            addField(createObjectBuilder, "activeRemotePortCount", remoteProcessGroupStatus.getActiveRemotePortCount());
            addField(createObjectBuilder, "activeThreadCount", remoteProcessGroupStatus.getActiveThreadCount());
            addField(createObjectBuilder, "inactiveRemotePortCount", remoteProcessGroupStatus.getInactiveRemotePortCount());
            addField(createObjectBuilder, "receivedContentSize", remoteProcessGroupStatus.getReceivedContentSize());
            addField(createObjectBuilder, "receivedCount", remoteProcessGroupStatus.getReceivedCount());
            addField(createObjectBuilder, "sentContentSize", remoteProcessGroupStatus.getSentContentSize());
            addField(createObjectBuilder, "sentCount", remoteProcessGroupStatus.getSentCount());
            addField(createObjectBuilder, "averageLineageDuration", Long.valueOf(remoteProcessGroupStatus.getAverageLineageDuration()));
            jsonArrayBuilder.add(createObjectBuilder.build());
        }
    }

    void serializePortStatus(String str, JsonArrayBuilder jsonArrayBuilder, JsonBuilderFactory jsonBuilderFactory, PortStatus portStatus, DateFormat dateFormat, String str2, String str3, String str4, String str5, Date date) {
        JsonObjectBuilder createObjectBuilder = jsonBuilderFactory.createObjectBuilder();
        String name = portStatus.getName();
        if (componentMatchesFilters(str, name)) {
            addCommonFields(createObjectBuilder, dateFormat, str2, str3, str4, str5, date, str, name);
            addField(createObjectBuilder, "componentId", portStatus.getId());
            addField(createObjectBuilder, "activeThreadCount", portStatus.getActiveThreadCount());
            addField(createObjectBuilder, "bytesReceived", Long.valueOf(portStatus.getBytesReceived()));
            addField(createObjectBuilder, "bytesSent", Long.valueOf(portStatus.getBytesSent()));
            addField(createObjectBuilder, "flowFilesReceived", Integer.valueOf(portStatus.getFlowFilesReceived()));
            addField(createObjectBuilder, "flowFilesSent", Integer.valueOf(portStatus.getFlowFilesSent()));
            addField(createObjectBuilder, "inputBytes", Long.valueOf(portStatus.getInputBytes()));
            addField(createObjectBuilder, "inputCount", Integer.valueOf(portStatus.getInputCount()));
            addField(createObjectBuilder, "outputBytes", Long.valueOf(portStatus.getOutputBytes()));
            addField(createObjectBuilder, "outputCount", Integer.valueOf(portStatus.getOutputCount()));
            jsonArrayBuilder.add(createObjectBuilder.build());
        }
    }

    void serializeConnectionStatus(JsonArrayBuilder jsonArrayBuilder, JsonBuilderFactory jsonBuilderFactory, ConnectionStatus connectionStatus, DateFormat dateFormat, String str, String str2, String str3, String str4, Date date) {
        JsonObjectBuilder createObjectBuilder = jsonBuilderFactory.createObjectBuilder();
        String name = connectionStatus.getName();
        if (componentMatchesFilters("Connection", name)) {
            addCommonFields(createObjectBuilder, dateFormat, str, str2, str3, str4, date, "Connection", name);
            addField(createObjectBuilder, "componentId", connectionStatus.getId());
            addField(createObjectBuilder, "maxQueuedBytes", Long.valueOf(connectionStatus.getMaxQueuedBytes()));
            addField(createObjectBuilder, "maxQueuedCount", Integer.valueOf(connectionStatus.getMaxQueuedCount()));
            addField(createObjectBuilder, "queuedBytes", Long.valueOf(connectionStatus.getQueuedBytes()));
            addField(createObjectBuilder, "queuedCount", Integer.valueOf(connectionStatus.getQueuedCount()));
            addField(createObjectBuilder, "inputBytes", Long.valueOf(connectionStatus.getInputBytes()));
            addField(createObjectBuilder, "inputCount", Integer.valueOf(connectionStatus.getInputCount()));
            addField(createObjectBuilder, "outputBytes", Long.valueOf(connectionStatus.getOutputBytes()));
            addField(createObjectBuilder, "outputCount", Integer.valueOf(connectionStatus.getOutputCount()));
            jsonArrayBuilder.add(createObjectBuilder.build());
        }
    }

    void serializeProcessorStatus(JsonArrayBuilder jsonArrayBuilder, JsonBuilderFactory jsonBuilderFactory, ProcessorStatus processorStatus, DateFormat dateFormat, String str, String str2, String str3, String str4, Date date) {
        JsonObjectBuilder createObjectBuilder = jsonBuilderFactory.createObjectBuilder();
        String name = processorStatus.getName();
        if (componentMatchesFilters("Processor", name)) {
            addCommonFields(createObjectBuilder, dateFormat, str, str2, str3, str4, date, "Processor", name);
            addField(createObjectBuilder, "componentId", processorStatus.getId());
            addField(createObjectBuilder, "processorType", processorStatus.getType());
            addField(createObjectBuilder, "averageLineageDurationMS", Long.valueOf(processorStatus.getAverageLineageDuration()));
            addField(createObjectBuilder, "bytesRead", Long.valueOf(processorStatus.getBytesRead()));
            addField(createObjectBuilder, "bytesWritten", Long.valueOf(processorStatus.getBytesWritten()));
            addField(createObjectBuilder, "bytesReceived", Long.valueOf(processorStatus.getBytesReceived()));
            addField(createObjectBuilder, "bytesSent", Long.valueOf(processorStatus.getBytesSent()));
            addField(createObjectBuilder, "flowFilesRemoved", Integer.valueOf(processorStatus.getFlowFilesRemoved()));
            addField(createObjectBuilder, "flowFilesReceived", Integer.valueOf(processorStatus.getFlowFilesReceived()));
            addField(createObjectBuilder, "flowFilesSent", Integer.valueOf(processorStatus.getFlowFilesSent()));
            addField(createObjectBuilder, "inputCount", Integer.valueOf(processorStatus.getInputCount()));
            addField(createObjectBuilder, "inputBytes", Long.valueOf(processorStatus.getInputBytes()));
            addField(createObjectBuilder, "outputCount", Integer.valueOf(processorStatus.getOutputCount()));
            addField(createObjectBuilder, "outputBytes", Long.valueOf(processorStatus.getOutputBytes()));
            addField(createObjectBuilder, "activeThreadCount", Integer.valueOf(processorStatus.getActiveThreadCount()));
            addField(createObjectBuilder, "invocations", Integer.valueOf(processorStatus.getInvocations()));
            addField(createObjectBuilder, "processingNanos", Long.valueOf(processorStatus.getProcessingNanos()));
            jsonArrayBuilder.add(createObjectBuilder.build());
        }
    }

    private static void addCommonFields(JsonObjectBuilder jsonObjectBuilder, DateFormat dateFormat, String str, String str2, String str3, String str4, Date date, String str5, String str6) {
        addField(jsonObjectBuilder, "statusId", UUID.randomUUID().toString());
        addField(jsonObjectBuilder, "timestampMillis", Long.valueOf(date.getTime()));
        addField(jsonObjectBuilder, "timestamp", dateFormat.format(date));
        addField(jsonObjectBuilder, "actorHostname", str);
        addField(jsonObjectBuilder, "componentType", str5);
        addField(jsonObjectBuilder, "componentName", str6);
        addField(jsonObjectBuilder, "parentId", str4);
        addField(jsonObjectBuilder, "platform", str3);
        addField(jsonObjectBuilder, "application", str2);
    }

    private static void addField(JsonObjectBuilder jsonObjectBuilder, String str, Long l) {
        if (l != null) {
            jsonObjectBuilder.add(str, l.longValue());
        }
    }

    private static void addField(JsonObjectBuilder jsonObjectBuilder, String str, Integer num) {
        if (num != null) {
            jsonObjectBuilder.add(str, num.intValue());
        }
    }

    private static void addField(JsonObjectBuilder jsonObjectBuilder, String str, String str2) {
        if (str2 == null) {
            return;
        }
        jsonObjectBuilder.add(str, str2);
    }
}
