package org.apache.nifi.reporting;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.reporting.s2s.SiteToSiteUtils;
import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;

@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.")
@Stateful(scopes = {Scope.LOCAL}, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@Restricted(restrictions = {@Restriction(requiredPermission = RequiredPermission.EXPORT_NIFI_DETAILS, explanation = "Provides operator the ability to send sensitive details contained in Provenance events to any external system.")})
@Tags({"provenance", "lineage", "tracking", "site", "site to site"})
/* loaded from: input_file:org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.class */
public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReportingTask {
    static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream", "Beginning of Stream", "Start reading provenance Events from the beginning of the stream (the oldest event first)");
    static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream", "Start reading provenance Events from the end of the stream, ignoring old events");
    static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder().name("s2s-prov-task-event-filter").displayName("Event Type to Include").description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. Available event types are " + Arrays.deepToString(ProvenanceEventType.values()) + ". If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor FILTER_EVENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder().name("s2s-prov-task-event-filter-exclude").displayName("Event Type to Exclude").description("Comma-separated list of event types that will be used to exclude the provenance events sent by the reporting task. Available event types are " + Arrays.deepToString(ProvenanceEventType.values()) + ". If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If an event type is included in Event Type to Include and excluded here, then the exclusion takes precedence and the event will not be sent.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder().name("s2s-prov-task-type-filter").displayName("Component Type to Include").description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
    static final PropertyDescriptor FILTER_COMPONENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder().name("s2s-prov-task-type-filter-exclude").displayName("Component Type to Exclude").description("Regular expression to exclude the provenance events based on the component type. The events matching the regular expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component type is included in Component Type to Include and excluded here, then the exclusion takes precedence and the event will not be sent.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
    static final PropertyDescriptor FILTER_COMPONENT_ID = new PropertyDescriptor.Builder().name("s2s-prov-task-id-filter").displayName("Component ID to Include").description("Comma-separated list of component UUID that will be used to filter the provenance events sent by the reporting task. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor FILTER_COMPONENT_ID_EXCLUDE = new PropertyDescriptor.Builder().name("s2s-prov-task-id-filter-exclude").displayName("Component ID to Exclude").description("Comma-separated list of component UUID that will be used to exclude the provenance events sent by the reporting task. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component UUID is included in Component ID to Include and excluded here, then the exclusion takes precedence and the event will not be sent.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor FILTER_COMPONENT_NAME = new PropertyDescriptor.Builder().name("s2s-prov-task-name-filter").displayName("Component Name to Include").description("Regular expression to filter the provenance events based on the component name. Only the events matching the regular expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
    static final PropertyDescriptor FILTER_COMPONENT_NAME_EXCLUDE = new PropertyDescriptor.Builder().name("s2s-prov-task-name-filter-exclude").displayName("Component Name to Exclude").description("Regular expression to exclude the provenance events based on the component name. The events matching the regular expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component name is included in Component Name to Include and excluded here, then the exclusion takes precedence and the event will not be sent.").required(false).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
    static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder().name("start-position").displayName("Start Position").description("If the Reporting Task has never been run, or if its state has been reset by a user, specifies where in the stream of Provenance Events the Reporting Task should start").allowableValues(new AllowableValue[]{BEGINNING_OF_STREAM, END_OF_STREAM}).defaultValue(BEGINNING_OF_STREAM.getValue()).required(true).build();
    private volatile ProvenanceEventConsumer consumer;

    public SiteToSiteProvenanceReportingTask() throws IOException {
        this.recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("schema-provenance.avsc")));
    }

    @OnScheduled
    public void onScheduled(ConfigurationContext configurationContext) throws IOException {
        this.consumer = new ProvenanceEventConsumer();
        this.consumer.setStartPositionValue(configurationContext.getProperty(START_POSITION).getValue());
        this.consumer.setBatchSize(configurationContext.getProperty(SiteToSiteUtils.BATCH_SIZE).asInteger().intValue());
        this.consumer.setLogger(getLogger());
        this.consumer.setComponentTypeRegex(configurationContext.getProperty(FILTER_COMPONENT_TYPE).evaluateAttributeExpressions().getValue());
        this.consumer.setComponentTypeRegexExclude(configurationContext.getProperty(FILTER_COMPONENT_TYPE_EXCLUDE).evaluateAttributeExpressions().getValue());
        this.consumer.setComponentNameRegex(configurationContext.getProperty(FILTER_COMPONENT_NAME).evaluateAttributeExpressions().getValue());
        this.consumer.setComponentNameRegexExclude(configurationContext.getProperty(FILTER_COMPONENT_NAME_EXCLUDE).evaluateAttributeExpressions().getValue());
        String[] stripAll = StringUtils.stripAll(StringUtils.split(configurationContext.getProperty(FILTER_EVENT_TYPE).evaluateAttributeExpressions().getValue(), ','));
        if (stripAll != null) {
            for (String str : stripAll) {
                try {
                    this.consumer.addTargetEventType(new ProvenanceEventType[]{ProvenanceEventType.valueOf(str)});
                } catch (Exception e) {
                    getLogger().warn(str + " is not a correct event type, removed from the filtering.");
                }
            }
        }
        String[] stripAll2 = StringUtils.stripAll(StringUtils.split(configurationContext.getProperty(FILTER_EVENT_TYPE_EXCLUDE).evaluateAttributeExpressions().getValue(), ','));
        if (stripAll2 != null) {
            for (String str2 : stripAll2) {
                try {
                    this.consumer.addTargetEventTypeExclude(new ProvenanceEventType[]{ProvenanceEventType.valueOf(str2)});
                } catch (Exception e2) {
                    getLogger().warn(str2 + " is not a correct event type, removed from the exclude filtering.");
                }
            }
        }
        String[] stripAll3 = StringUtils.stripAll(StringUtils.split(configurationContext.getProperty(FILTER_COMPONENT_ID).evaluateAttributeExpressions().getValue(), ','));
        if (stripAll3 != null) {
            this.consumer.addTargetComponentId(stripAll3);
        }
        String[] stripAll4 = StringUtils.stripAll(StringUtils.split(configurationContext.getProperty(FILTER_COMPONENT_ID_EXCLUDE).evaluateAttributeExpressions().getValue(), ','));
        if (stripAll4 != null) {
            this.consumer.addTargetComponentIdExclude(stripAll4);
        }
        this.consumer.setScheduled(true);
    }

    @OnUnscheduled
    public void onUnscheduled() {
        if (this.consumer != null) {
            this.consumer.setScheduled(false);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.nifi.reporting.AbstractSiteToSiteReportingTask
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList(super.getSupportedPropertyDescriptors());
        arrayList.add(SiteToSiteUtils.PLATFORM);
        arrayList.add(FILTER_EVENT_TYPE);
        arrayList.add(FILTER_EVENT_TYPE_EXCLUDE);
        arrayList.add(FILTER_COMPONENT_TYPE);
        arrayList.add(FILTER_COMPONENT_TYPE_EXCLUDE);
        arrayList.add(FILTER_COMPONENT_ID);
        arrayList.add(FILTER_COMPONENT_ID_EXCLUDE);
        arrayList.add(FILTER_COMPONENT_NAME);
        arrayList.add(FILTER_COMPONENT_NAME_EXCLUDE);
        arrayList.add(START_POSITION);
        return arrayList;
    }

    public void onTrigger(ReportingContext reportingContext) {
        boolean isClustered = reportingContext.isClustered();
        String clusterNodeIdentifier = reportingContext.getClusterNodeIdentifier();
        if (clusterNodeIdentifier == null && isClustered) {
            getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. Will wait for Node Identifier to be established.");
            return;
        }
        ProcessGroupStatus controllerStatus = reportingContext.getEventAccess().getControllerStatus();
        String name = controllerStatus == null ? null : controllerStatus.getName();
        try {
            URL url = URI.create(reportingContext.getProperty(SiteToSiteUtils.INSTANCE_URL).evaluateAttributeExpressions().getValue()).toURL();
            String host = url.getHost();
            String value = reportingContext.getProperty(SiteToSiteUtils.PLATFORM).evaluateAttributeExpressions().getValue();
            Boolean asBoolean = reportingContext.getProperty(ALLOW_NULL_VALUES).asBoolean();
            JsonBuilderFactory createBuilderFactory = Json.createBuilderFactory(Collections.emptyMap());
            JsonObjectBuilder createObjectBuilder = createBuilderFactory.createObjectBuilder();
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
            simpleDateFormat.setTimeZone(TimeZone.getTimeZone("Z"));
            this.consumer.consumeEvents(reportingContext, (componentMapHolder, list) -> {
                long nanoTime = System.nanoTime();
                JsonArrayBuilder createArrayBuilder = createBuilderFactory.createArrayBuilder();
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ProvenanceEventRecord provenanceEventRecord = (ProvenanceEventRecord) it.next();
                    String componentName = componentMapHolder.getComponentName(provenanceEventRecord.getComponentId());
                    String processGroupId = componentMapHolder.getProcessGroupId(provenanceEventRecord.getComponentId(), provenanceEventRecord.getComponentType());
                    createArrayBuilder.add(serialize(createBuilderFactory, createObjectBuilder, provenanceEventRecord, simpleDateFormat, componentName, processGroupId, componentMapHolder.getComponentName(processGroupId), host, url, name, value, clusterNodeIdentifier, asBoolean));
                }
                JsonArray build = createArrayBuilder.build();
                Transaction transaction = null;
                try {
                    setup(reportingContext);
                    Transaction createTransaction = getClient().createTransaction(TransferDirection.SEND);
                    if (createTransaction == null) {
                        throw new ProcessException("All destination nodes are penalized; will attempt to send data later");
                    }
                    HashMap hashMap = new HashMap();
                    String uuid = UUID.randomUUID().toString();
                    hashMap.put("reporting.task.transaction.id", uuid);
                    hashMap.put("reporting.task.name", getName());
                    hashMap.put("reporting.task.uuid", getIdentifier());
                    hashMap.put("reporting.task.type", getClass().getSimpleName());
                    hashMap.put("mime.type", "application/json");
                    sendData(reportingContext, createTransaction, hashMap, build);
                    createTransaction.confirm();
                    createTransaction.complete();
                    getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}", new Object[]{Integer.valueOf(list.size()), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)), uuid, Long.valueOf(((ProvenanceEventRecord) list.get(0)).getEventId())});
                } catch (Exception e) {
                    if (0 != 0) {
                        transaction.error();
                    }
                    if (!(e instanceof ProcessException)) {
                        throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e);
                    }
                    throw e;
                }
            });
        } catch (IllegalArgumentException | MalformedURLException e) {
            throw new AssertionError();
        }
    }

    private JsonObject serialize(JsonBuilderFactory jsonBuilderFactory, JsonObjectBuilder jsonObjectBuilder, ProvenanceEventRecord provenanceEventRecord, DateFormat dateFormat, String str, String str2, String str3, String str4, URL url, String str5, String str6, String str7, Boolean bool) {
        addField(jsonObjectBuilder, "eventId", UUID.randomUUID().toString(), bool.booleanValue());
        addField(jsonObjectBuilder, "eventOrdinal", Long.valueOf(provenanceEventRecord.getEventId()), bool.booleanValue());
        addField(jsonObjectBuilder, "eventType", provenanceEventRecord.getEventType().name(), bool.booleanValue());
        addField(jsonObjectBuilder, "timestampMillis", Long.valueOf(provenanceEventRecord.getEventTime()), bool.booleanValue());
        addField(jsonObjectBuilder, "timestamp", dateFormat.format(Long.valueOf(provenanceEventRecord.getEventTime())), bool.booleanValue());
        addField(jsonObjectBuilder, "durationMillis", Long.valueOf(provenanceEventRecord.getEventDuration()), bool.booleanValue());
        addField(jsonObjectBuilder, "lineageStart", Long.valueOf(provenanceEventRecord.getLineageStartDate()), bool.booleanValue());
        addField(jsonObjectBuilder, "details", provenanceEventRecord.getDetails(), bool.booleanValue());
        addField(jsonObjectBuilder, "componentId", provenanceEventRecord.getComponentId(), bool.booleanValue());
        addField(jsonObjectBuilder, "componentType", provenanceEventRecord.getComponentType(), bool.booleanValue());
        addField(jsonObjectBuilder, "componentName", str, bool.booleanValue());
        addField(jsonObjectBuilder, "processGroupId", str2, bool.booleanValue());
        addField(jsonObjectBuilder, "processGroupName", str3, bool.booleanValue());
        addField(jsonObjectBuilder, "entityId", provenanceEventRecord.getFlowFileUuid(), bool.booleanValue());
        addField(jsonObjectBuilder, "entityType", "org.apache.nifi.flowfile.FlowFile", bool.booleanValue());
        addField(jsonObjectBuilder, "entitySize", Long.valueOf(provenanceEventRecord.getFileSize()), bool.booleanValue());
        addField(jsonObjectBuilder, "previousEntitySize", provenanceEventRecord.getPreviousFileSize(), bool.booleanValue());
        addField(jsonObjectBuilder, jsonBuilderFactory, "updatedAttributes", (Map<String, String>) provenanceEventRecord.getUpdatedAttributes(), bool);
        addField(jsonObjectBuilder, jsonBuilderFactory, "previousAttributes", (Map<String, String>) provenanceEventRecord.getPreviousAttributes(), bool);
        addField(jsonObjectBuilder, "actorHostname", str4, bool.booleanValue());
        if (url != null) {
            String url2 = url.toString();
            String str8 = url2.substring(0, url2.length() - "/nifi".length()) + "/nifi-api/provenance-events/" + provenanceEventRecord.getEventId() + "/content/";
            String str9 = str7 == null ? "" : "?clusterNodeId=" + str7;
            addField(jsonObjectBuilder, "contentURI", str8 + "output" + str9, bool.booleanValue());
            addField(jsonObjectBuilder, "previousContentURI", str8 + "input" + str9, bool.booleanValue());
        }
        addField(jsonObjectBuilder, jsonBuilderFactory, "parentIds", provenanceEventRecord.getParentUuids(), bool);
        addField(jsonObjectBuilder, jsonBuilderFactory, "childIds", provenanceEventRecord.getChildUuids(), bool);
        addField(jsonObjectBuilder, "transitUri", provenanceEventRecord.getTransitUri(), bool.booleanValue());
        addField(jsonObjectBuilder, "remoteIdentifier", provenanceEventRecord.getSourceSystemFlowFileIdentifier(), bool.booleanValue());
        addField(jsonObjectBuilder, "alternateIdentifier", provenanceEventRecord.getAlternateIdentifierUri(), bool.booleanValue());
        addField(jsonObjectBuilder, "platform", str6, bool.booleanValue());
        addField(jsonObjectBuilder, "application", str5, bool.booleanValue());
        return jsonObjectBuilder.build();
    }

    private static void addField(JsonObjectBuilder jsonObjectBuilder, JsonBuilderFactory jsonBuilderFactory, String str, Map<String, String> map, Boolean bool) {
        if (map == null) {
            if (bool.booleanValue()) {
                jsonObjectBuilder.add(str, JsonValue.NULL);
                return;
            }
            return;
        }
        JsonObjectBuilder createObjectBuilder = jsonBuilderFactory.createObjectBuilder();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (entry.getKey() != null) {
                if (entry.getValue() != null) {
                    createObjectBuilder.add(entry.getKey(), entry.getValue());
                } else if (bool.booleanValue()) {
                    createObjectBuilder.add(entry.getKey(), JsonValue.NULL);
                }
            }
        }
        jsonObjectBuilder.add(str, createObjectBuilder);
    }

    private void addField(JsonObjectBuilder jsonObjectBuilder, JsonBuilderFactory jsonBuilderFactory, String str, Collection<String> collection, Boolean bool) {
        if (collection != null) {
            jsonObjectBuilder.add(str, createJsonArray(jsonBuilderFactory, collection));
        } else if (bool.booleanValue()) {
            jsonObjectBuilder.add(str, JsonValue.NULL);
        }
    }

    private static JsonArrayBuilder createJsonArray(JsonBuilderFactory jsonBuilderFactory, Collection<String> collection) {
        JsonArrayBuilder createArrayBuilder = jsonBuilderFactory.createArrayBuilder();
        for (String str : collection) {
            if (str != null) {
                createArrayBuilder.add(str);
            }
        }
        return createArrayBuilder;
    }
}
