package org.apache.nifi.reporting;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;

@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.")
@Stateful(scopes = {Scope.LOCAL}, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@Tags({"provenance", "lineage", "tracking", "site", "site to site"})
/* loaded from: input_file:org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.class */
public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReportingTask {
    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    static final String LAST_EVENT_ID_KEY = "last_event_id";
    static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder().name("Platform").description("The value to use for the platform field in each provenance event.").required(true).expressionLanguageSupported(true).defaultValue("nifi").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    private volatile long firstEventId = -1;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.nifi.reporting.AbstractSiteToSiteReportingTask
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList(super.getSupportedPropertyDescriptors());
        arrayList.add(PLATFORM);
        return arrayList;
    }

    private Map<String, String> createComponentMap(ProcessGroupStatus processGroupStatus) {
        HashMap hashMap = new HashMap();
        if (processGroupStatus != null) {
            hashMap.put(processGroupStatus.getId(), processGroupStatus.getName());
            for (ProcessorStatus processorStatus : processGroupStatus.getProcessorStatus()) {
                hashMap.put(processorStatus.getId(), processorStatus.getName());
            }
            for (PortStatus portStatus : processGroupStatus.getInputPortStatus()) {
                hashMap.put(portStatus.getId(), portStatus.getName());
            }
            for (PortStatus portStatus2 : processGroupStatus.getOutputPortStatus()) {
                hashMap.put(portStatus2.getId(), portStatus2.getName());
            }
            for (RemoteProcessGroupStatus remoteProcessGroupStatus : processGroupStatus.getRemoteProcessGroupStatus()) {
                hashMap.put(remoteProcessGroupStatus.getId(), remoteProcessGroupStatus.getName());
            }
            for (ProcessGroupStatus processGroupStatus2 : processGroupStatus.getProcessGroupStatus()) {
                hashMap.put(processGroupStatus2.getId(), processGroupStatus2.getName());
            }
        }
        return hashMap;
    }

    public void onTrigger(ReportingContext reportingContext) {
        boolean isClustered = reportingContext.isClustered();
        String clusterNodeIdentifier = reportingContext.getClusterNodeIdentifier();
        if (clusterNodeIdentifier == null && isClustered) {
            getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. Will wait for Node Identifier to be established.");
            return;
        }
        ProcessGroupStatus controllerStatus = reportingContext.getEventAccess().getControllerStatus();
        String name = controllerStatus == null ? null : controllerStatus.getName();
        Map<String, String> createComponentMap = createComponentMap(controllerStatus);
        Long maxEventId = reportingContext.getEventAccess().getProvenanceRepository().getMaxEventId();
        if (maxEventId == null) {
            getLogger().debug("No events to send because no events have been created yet.");
            return;
        }
        if (this.firstEventId < 0) {
            try {
                Map map = reportingContext.getStateManager().getState(Scope.LOCAL).toMap();
                if (map.containsKey(LAST_EVENT_ID_KEY)) {
                    this.firstEventId = Long.parseLong((String) map.get(LAST_EVENT_ID_KEY)) + 1;
                }
                if (maxEventId.longValue() < this.firstEventId - 1) {
                    getLogger().warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its ids. Restarting querying from the beginning.", new Object[]{maxEventId, Long.valueOf(this.firstEventId)});
                    this.firstEventId = -1L;
                }
            } catch (IOException e) {
                getLogger().error("Failed to get state at start up due to {}:" + e.getMessage(), e);
                return;
            }
        }
        if (maxEventId.longValue() == this.firstEventId - 1) {
            getLogger().debug("No events to send due to the current max id being equal to the last id that was queried.");
            return;
        }
        try {
            List<ProvenanceEventRecord> provenanceEvents = reportingContext.getEventAccess().getProvenanceEvents(this.firstEventId, reportingContext.getProperty(BATCH_SIZE).asInteger().intValue());
            if (provenanceEvents == null || provenanceEvents.isEmpty()) {
                getLogger().debug("No events to send due to 'events' being null or empty.");
                return;
            }
            try {
                URL url = new URL(reportingContext.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue());
                String host = url.getHost();
                String value = reportingContext.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
                JsonBuilderFactory createBuilderFactory = Json.createBuilderFactory(Collections.emptyMap());
                JsonObjectBuilder createObjectBuilder = createBuilderFactory.createObjectBuilder();
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat(TIMESTAMP_FORMAT);
                simpleDateFormat.setTimeZone(TimeZone.getTimeZone("Z"));
                while (provenanceEvents != null && !provenanceEvents.isEmpty()) {
                    long nanoTime = System.nanoTime();
                    JsonArrayBuilder createArrayBuilder = createBuilderFactory.createArrayBuilder();
                    for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
                        createArrayBuilder.add(serialize(createBuilderFactory, createObjectBuilder, provenanceEventRecord, simpleDateFormat, createComponentMap.get(provenanceEventRecord.getComponentId()), host, url, name, value, clusterNodeIdentifier));
                    }
                    JsonArray build = createArrayBuilder.build();
                    try {
                        Transaction createTransaction = getClient().createTransaction(TransferDirection.SEND);
                        if (createTransaction == null) {
                            getLogger().debug("All destination nodes are penalized; will attempt to send data later");
                            return;
                        }
                        HashMap hashMap = new HashMap();
                        String uuid = UUID.randomUUID().toString();
                        hashMap.put("reporting.task.transaction.id", uuid);
                        createTransaction.send(build.toString().getBytes(StandardCharsets.UTF_8), hashMap);
                        createTransaction.confirm();
                        createTransaction.complete();
                        getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}", new Object[]{Integer.valueOf(provenanceEvents.size()), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)), uuid, Long.valueOf(((ProvenanceEventRecord) provenanceEvents.get(0)).getEventId())});
                        ProvenanceEventRecord provenanceEventRecord2 = (ProvenanceEventRecord) provenanceEvents.get(provenanceEvents.size() - 1);
                        String valueOf = String.valueOf(provenanceEventRecord2.getEventId());
                        try {
                            StateManager stateManager = reportingContext.getStateManager();
                            HashMap hashMap2 = new HashMap();
                            hashMap2.put(LAST_EVENT_ID_KEY, valueOf);
                            stateManager.setState(hashMap2, Scope.LOCAL);
                        } catch (IOException e2) {
                            getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart. The message of {} was: {}", new Object[]{valueOf, e2, e2, e2.getMessage()}, e2);
                        }
                        this.firstEventId = provenanceEventRecord2.getEventId() + 1;
                        try {
                            provenanceEvents = reportingContext.getEventAccess().getProvenanceEvents(this.firstEventId, reportingContext.getProperty(BATCH_SIZE).asInteger().intValue());
                        } catch (IOException e3) {
                            getLogger().error("Failed to retrieve Provenance Events from repository due to: " + e3.getMessage(), e3);
                            return;
                        }
                    } catch (IOException e4) {
                        throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e4.getMessage(), e4);
                    }
                }
            } catch (MalformedURLException e5) {
                throw new AssertionError();
            }
        } catch (IOException e6) {
            getLogger().error("Failed to retrieve Provenance Events from repository due to: " + e6.getMessage(), e6);
        }
    }

    static JsonObject serialize(JsonBuilderFactory jsonBuilderFactory, JsonObjectBuilder jsonObjectBuilder, ProvenanceEventRecord provenanceEventRecord, DateFormat dateFormat, String str, String str2, URL url, String str3, String str4, String str5) {
        addField(jsonObjectBuilder, "eventId", UUID.randomUUID().toString());
        addField(jsonObjectBuilder, "eventOrdinal", Long.valueOf(provenanceEventRecord.getEventId()));
        addField(jsonObjectBuilder, "eventType", provenanceEventRecord.getEventType().name());
        addField(jsonObjectBuilder, "timestampMillis", Long.valueOf(provenanceEventRecord.getEventTime()));
        addField(jsonObjectBuilder, "timestamp", dateFormat.format(Long.valueOf(provenanceEventRecord.getEventTime())));
        addField(jsonObjectBuilder, "durationMillis", Long.valueOf(provenanceEventRecord.getEventDuration()));
        addField(jsonObjectBuilder, "lineageStart", Long.valueOf(provenanceEventRecord.getLineageStartDate()));
        addField(jsonObjectBuilder, "details", provenanceEventRecord.getDetails());
        addField(jsonObjectBuilder, "componentId", provenanceEventRecord.getComponentId());
        addField(jsonObjectBuilder, "componentType", provenanceEventRecord.getComponentType());
        addField(jsonObjectBuilder, "componentName", str);
        addField(jsonObjectBuilder, "entityId", provenanceEventRecord.getFlowFileUuid());
        addField(jsonObjectBuilder, "entityType", "org.apache.nifi.flowfile.FlowFile");
        addField(jsonObjectBuilder, "entitySize", Long.valueOf(provenanceEventRecord.getFileSize()));
        addField(jsonObjectBuilder, "previousEntitySize", provenanceEventRecord.getPreviousFileSize());
        addField(jsonObjectBuilder, jsonBuilderFactory, "updatedAttributes", (Map<String, String>) provenanceEventRecord.getUpdatedAttributes());
        addField(jsonObjectBuilder, jsonBuilderFactory, "previousAttributes", (Map<String, String>) provenanceEventRecord.getPreviousAttributes());
        addField(jsonObjectBuilder, "actorHostname", str2);
        if (url != null) {
            String url2 = url.toString();
            String str6 = url2.substring(0, url2.length() - "/nifi".length()) + "/nifi-api/provenance-events/" + provenanceEventRecord.getEventId() + "/content/";
            String str7 = str5 == null ? "" : "?clusterNodeId=" + str5;
            addField(jsonObjectBuilder, "contentURI", str6 + "output" + str7);
            addField(jsonObjectBuilder, "previousContentURI", str6 + "input" + str7);
        }
        addField(jsonObjectBuilder, jsonBuilderFactory, "parentIds", provenanceEventRecord.getParentUuids());
        addField(jsonObjectBuilder, jsonBuilderFactory, "childIds", provenanceEventRecord.getChildUuids());
        addField(jsonObjectBuilder, "transitUri", provenanceEventRecord.getTransitUri());
        addField(jsonObjectBuilder, "remoteIdentifier", provenanceEventRecord.getSourceSystemFlowFileIdentifier());
        addField(jsonObjectBuilder, "alternateIdentifier", provenanceEventRecord.getAlternateIdentifierUri());
        addField(jsonObjectBuilder, "platform", str4);
        addField(jsonObjectBuilder, "application", str3);
        return jsonObjectBuilder.build();
    }

    private static void addField(JsonObjectBuilder jsonObjectBuilder, JsonBuilderFactory jsonBuilderFactory, String str, Map<String, String> map) {
        if (map == null) {
            return;
        }
        JsonObjectBuilder createObjectBuilder = jsonBuilderFactory.createObjectBuilder();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (entry.getKey() != null && entry.getValue() != null) {
                createObjectBuilder.add(entry.getKey(), entry.getValue());
            }
        }
        jsonObjectBuilder.add(str, createObjectBuilder);
    }

    private static void addField(JsonObjectBuilder jsonObjectBuilder, String str, Long l) {
        if (l != null) {
            jsonObjectBuilder.add(str, l.longValue());
        }
    }

    private static void addField(JsonObjectBuilder jsonObjectBuilder, JsonBuilderFactory jsonBuilderFactory, String str, Collection<String> collection) {
        if (collection == null) {
            return;
        }
        jsonObjectBuilder.add(str, createJsonArray(jsonBuilderFactory, collection));
    }

    private static void addField(JsonObjectBuilder jsonObjectBuilder, String str, String str2) {
        if (str2 == null) {
            return;
        }
        jsonObjectBuilder.add(str, str2);
    }

    private static JsonArrayBuilder createJsonArray(JsonBuilderFactory jsonBuilderFactory, Collection<String> collection) {
        JsonArrayBuilder createArrayBuilder = jsonBuilderFactory.createArrayBuilder();
        for (String str : collection) {
            if (str != null) {
                createArrayBuilder.add(str);
            }
        }
        return createArrayBuilder;
    }
}
