/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.atlas.reporting;

import com.sun.jersey.api.client.ClientResponse;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.Writer;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasServiceException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowAnalyzer;
import org.apache.nifi.atlas.hook.NiFiAtlasHook;
import org.apache.nifi.atlas.provenance.FilesystemPathsLevel;
import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
import org.apache.nifi.atlas.resolver.NamespaceResolver;
import org.apache.nifi.atlas.resolver.NamespaceResolvers;
import org.apache.nifi.atlas.security.AtlasAuthN;
import org.apache.nifi.atlas.security.Basic;
import org.apache.nifi.atlas.security.Kerberos;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringSelector;

@Tags(value={"atlas", "lineage"})
@CapabilityDescription(value="Report NiFi flow data set level lineage to Apache Atlas. End-to-end lineages across NiFi environments and other systems can be reported if those are connected by different protocols and data set, such as NiFi Site-to-Site, Kafka topic or Hive tables ... etc. Atlas lineage reported by this reporting task can be useful to grasp the high level relationships between processes and data sets, in addition to NiFi provenance events providing detailed event level lineage. See 'Additional Details' for further description and limitations.")
@Stateful(scopes={Scope.LOCAL}, description="Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@DynamicProperty(name="hostnamePattern.<namespace>", value="hostname Regex patterns", description="White space delimited (including new line) Regular Expressions to resolve a namespace from a hostname or IP address of a transit URI of NiFi provenance record.", expressionLanguageScope=ExpressionLanguageScope.VARIABLE_REGISTRY)
@RequiresInstanceClassLoading
public class ReportLineageToAtlas
extends AbstractReportingTask {
    private static final String ATLAS_URL_DELIMITER = ",";
    static final PropertyDescriptor ATLAS_URLS = new PropertyDescriptor.Builder().name("atlas-urls").displayName("Atlas URLs").description("Comma separated URL of Atlas Servers (e.g. http://atlas-server-hostname:21000 or https://atlas-server-hostname:21443). For accessing Atlas behind Knox gateway, specify Knox gateway URL (e.g. https://knox-hostname:8443/gateway/{topology-name}/atlas). If not specified, 'atlas.rest.address' in Atlas Configuration File is used.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    public static final PropertyDescriptor ATLAS_CONNECT_TIMEOUT = new PropertyDescriptor.Builder().name("atlas-connect-timeout").displayName("Atlas Connect Timeout").description("Max wait time for connection to Atlas.").required(true).defaultValue("60 sec").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final PropertyDescriptor ATLAS_READ_TIMEOUT = new PropertyDescriptor.Builder().name("atlas-read-timeout").displayName("Atlas Read Timeout").description("Max wait time for response from Atlas.").required(true).defaultValue("60 sec").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    static final AllowableValue ATLAS_AUTHN_BASIC = new AllowableValue("basic", "Basic", "Use username and password.");
    static final AllowableValue ATLAS_AUTHN_KERBEROS = new AllowableValue("kerberos", "Kerberos", "Use Kerberos keytab file.");
    static final PropertyDescriptor ATLAS_AUTHN_METHOD = new PropertyDescriptor.Builder().name("atlas-authentication-method").displayName("Atlas Authentication Method").description("Specify how to authenticate this reporting task to Atlas server.").required(true).allowableValues(new AllowableValue[]{ATLAS_AUTHN_BASIC, ATLAS_AUTHN_KERBEROS}).defaultValue(ATLAS_AUTHN_BASIC.getValue()).build();
    public static final PropertyDescriptor ATLAS_USER = new PropertyDescriptor.Builder().name("atlas-username").displayName("Atlas Username").description("User name to communicate with Atlas.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    public static final PropertyDescriptor ATLAS_PASSWORD = new PropertyDescriptor.Builder().name("atlas-password").displayName("Atlas Password").description("Password to communicate with Atlas.").required(false).sensitive(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    static final PropertyDescriptor ATLAS_CONF_DIR = new PropertyDescriptor.Builder().name("atlas-conf-dir").displayName("Atlas Configuration Directory").description("Directory path that contains 'atlas-application.properties' file. If not specified and 'Create Atlas Configuration File' is disabled, then, 'atlas-application.properties' file under root classpath is used.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.DIRECTORY, new ResourceType[0]).dynamicallyModifiesClasspath(true).build();
    public static final PropertyDescriptor ATLAS_NIFI_URL = new PropertyDescriptor.Builder().name("atlas-nifi-url").displayName("NiFi URL for Atlas").description("NiFi URL is used in Atlas to represent this NiFi cluster (or standalone instance). It is recommended to use one that can be accessible remotely instead of using 'localhost'.").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.URL_VALIDATOR).build();
    public static final PropertyDescriptor ATLAS_DEFAULT_CLUSTER_NAME = new PropertyDescriptor.Builder().name("atlas-default-cluster-name").displayName("Atlas Default Metadata Namespace").description("Namespace for Atlas entities reported by this ReportingTask. If not specified, 'atlas.metadata.namespace' or 'atlas.cluster.name' (the former having priority) in Atlas Configuration File is used. Multiple mappings can be configured by user defined properties. See 'Additional Details...' for more.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    static final PropertyDescriptor ATLAS_CONF_CREATE = new PropertyDescriptor.Builder().name("atlas-conf-create").displayName("Create Atlas Configuration File").description("If enabled, 'atlas-application.properties' file will be created in 'Atlas Configuration Directory' automatically when this Reporting Task starts. Note that the existing configuration file will be overwritten.").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("ssl-context-service").displayName("SSL Context Service").description("Specifies the SSL Context Service to use for communicating with Atlas and Kafka.").required(false).identifiesControllerService(SSLContextService.class).build();
    static final PropertyDescriptor KAFKA_BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder().name("kafka-bootstrap-servers").displayName("Kafka Bootstrap Servers").description("Kafka Bootstrap Servers to send Atlas hook notification messages based on NiFi provenance events. E.g. 'localhost:9092' NOTE: Once this reporting task has started, restarting NiFi is required to changed this property as Atlas library holds a unmodifiable static reference to Kafka client.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
    static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
    static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
    static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL");
    static final PropertyDescriptor KAFKA_SECURITY_PROTOCOL = new PropertyDescriptor.Builder().name("kafka-security-protocol").displayName("Kafka Security Protocol").description("Protocol used to communicate with Kafka brokers to send Atlas hook notification messages. Corresponds to Kafka's 'security.protocol' property.").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new AllowableValue[]{SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL}).defaultValue(SEC_PLAINTEXT.getValue()).build();
    public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder().name("nifi-kerberos-principal").displayName("Kerberos Principal").description("The Kerberos principal for this NiFi instance to access Atlas API and Kafka brokers. If not set, it is expected to set a JAAS configuration file in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.").required(false).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor KERBEROS_KEYTAB = new PropertyDescriptor.Builder().name("nifi-kerberos-keytab").displayName("Kerberos Keytab").description("The Kerberos keytab for this NiFi instance to access Atlas API and Kafka brokers. If not set, it is expected to set a JAAS configuration file in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.").required(false).identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, new ResourceType[0]).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder().name("kerberos-credentials-service").displayName("Kerberos Credentials Service").description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos").identifiesControllerService(KerberosCredentialsService.class).required(false).build();
    static final PropertyDescriptor KAFKA_KERBEROS_SERVICE_NAME = new PropertyDescriptor.Builder().name("kafka-kerberos-service-name").displayName("Kafka Kerberos Service Name").description("The service name that matches the primary name of the Kafka server configured in the broker JAAS file. This can be defined either in Kafka's JAAS config or in Kafka's config. Corresponds to Kafka's 'security.protocol' property. It is ignored unless one of the SASL options of the <Security Protocol> are selected.").required(false).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("kafka").build();
    static final AllowableValue LINEAGE_STRATEGY_SIMPLE_PATH = new AllowableValue("SimplePath", "Simple Path", "Map NiFi provenance events and target Atlas DataSets to statically created 'nifi_flow_path' Atlas Processes. See also 'Additional Details'.");
    static final AllowableValue LINEAGE_STRATEGY_COMPLETE_PATH = new AllowableValue("CompletePath", "Complete Path", "Create separate 'nifi_flow_path' Atlas Processes for each distinct input and output DataSet combinations by looking at the complete route for a given FlowFile. See also 'Additional Details.");
    static final PropertyDescriptor LINEAGE_STRATEGY = new PropertyDescriptor.Builder().name("nifi-lineage-strategy").displayName("Lineage Strategy").description("Specifies granularity on how NiFi data flow should be reported to Atlas. NOTE: It is strongly recommended to keep using the same strategy once this reporting task started to keep Atlas data clean. Switching strategies will not delete Atlas entities created by the old strategy. Having mixed entities created by different strategies makes Atlas lineage graph noisy. For more detailed description on each strategy and differences, refer 'NiFi Lineage Strategy' section in Additional Details.").required(true).allowableValues(new AllowableValue[]{LINEAGE_STRATEGY_SIMPLE_PATH, LINEAGE_STRATEGY_COMPLETE_PATH}).defaultValue(LINEAGE_STRATEGY_SIMPLE_PATH.getValue()).build();
    static final AllowableValue AWS_S3_MODEL_VERSION_V1 = new AllowableValue("v1", "v1", "Creates AWS S3 directory entities version 1 (aws_s3_pseudo_dir).");
    static final AllowableValue AWS_S3_MODEL_VERSION_V2 = new AllowableValue("v2", "v2", "Creates AWS S3 directory entities version 2 (aws_s3_v2_directory).");
    static final PropertyDescriptor AWS_S3_MODEL_VERSION = new PropertyDescriptor.Builder().name("aws-s3-model-version").displayName("AWS S3 Model Version").description("Specifies what type of AWS S3 directory entities will be created in Atlas for s3a:// transit URIs (eg. PutHDFS with S3 integration). NOTE: It is strongly recommended to keep using the same AWS S3 entity model version once this reporting task started to keep Atlas data clean. Switching versions will not delete existing Atlas entities created by the old version, nor migrate them to the new version.").required(true).allowableValues(new AllowableValue[]{AWS_S3_MODEL_VERSION_V1, AWS_S3_MODEL_VERSION_V2}).defaultValue(AWS_S3_MODEL_VERSION_V2.getValue()).build();
    static final AllowableValue FILESYSTEM_PATHS_LEVEL_FILE = new AllowableValue(FilesystemPathsLevel.FILE.name(), FilesystemPathsLevel.FILE.getDisplayName(), "Creates File level paths.");
    static final AllowableValue FILESYSTEM_PATHS_LEVEL_DIRECTORY = new AllowableValue(FilesystemPathsLevel.DIRECTORY.name(), FilesystemPathsLevel.DIRECTORY.getDisplayName(), "Creates Directory level paths.");
    static final PropertyDescriptor FILESYSTEM_PATHS_LEVEL = new PropertyDescriptor.Builder().name("filesystem-paths-level").displayName("Filesystem Path Entities Level").description("Specifies how the filesystem path entities (fs_path and hdfs_path) will be logged in Atlas: File or Directory level. In case of File level, each individual file entity will be sent to Atlas as a separate entity with the full path including the filename. Directory level only logs the path of the parent directory without the filename. This setting affects processors working with files, like GetFile or PutHDFS. NOTE: Although the default value is File level for backward compatibility reasons, it is highly recommended to set it to Directory level because File level logging can generate a huge number of entities in Atlas.").required(true).allowableValues(new AllowableValue[]{FILESYSTEM_PATHS_LEVEL_FILE, FILESYSTEM_PATHS_LEVEL_DIRECTORY}).defaultValue(FILESYSTEM_PATHS_LEVEL_FILE.getValue()).build();
    private static final String ATLAS_PROPERTIES_FILENAME = "atlas-application.properties";
    private static final String ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS = "atlas.client.connectTimeoutMSecs";
    private static final String ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS = "atlas.client.readTimeoutMSecs";
    private static final String ATLAS_PROPERTY_METADATA_NAMESPACE = "atlas.metadata.namespace";
    private static final String ATLAS_PROPERTY_CLUSTER_NAME = "atlas.cluster.name";
    private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
    private static final String ATLAS_PROPERTY_ENABLE_TLS = "atlas.enableTLS";
    private static final String ATLAS_KAFKA_PREFIX = "atlas.kafka.";
    private static final String ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS = "atlas.kafka.bootstrap.servers";
    private static final String ATLAS_PROPERTY_KAFKA_CLIENT_ID = "atlas.kafka.client.id";
    private static final String SSL_CLIENT_XML_FILENAME = "ssl-client.xml";
    private static final String SSL_CLIENT_XML_TRUSTSTORE_LOCATION = "ssl.client.truststore.location";
    private static final String SSL_CLIENT_XML_TRUSTSTORE_PASSWORD = "ssl.client.truststore.password";
    private static final String SSL_CLIENT_XML_TRUSTSTORE_TYPE = "ssl.client.truststore.type";
    private final ServiceLoader<NamespaceResolver> namespaceResolverLoader = ServiceLoader.load(NamespaceResolver.class);
    private volatile AtlasAuthN atlasAuthN;
    private volatile Properties atlasProperties;
    private volatile boolean isTypeDefCreated = false;
    private volatile String defaultMetadataNamespace;
    private volatile ProvenanceEventConsumer consumer;
    private volatile NamespaceResolvers namespaceResolvers;
    private volatile NiFiAtlasHook nifiAtlasHook;
    private volatile LineageStrategy lineageStrategy;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(ATLAS_URLS);
        properties.add(ATLAS_CONF_DIR);
        properties.add(ATLAS_CONF_CREATE);
        properties.add(ATLAS_DEFAULT_CLUSTER_NAME);
        properties.add(LINEAGE_STRATEGY);
        properties.add(ProvenanceEventConsumer.PROVENANCE_START_POSITION);
        properties.add(ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE);
        properties.add(ATLAS_NIFI_URL);
        properties.add(ATLAS_AUTHN_METHOD);
        properties.add(ATLAS_USER);
        properties.add(ATLAS_PASSWORD);
        properties.add(KERBEROS_CREDENTIALS_SERVICE);
        properties.add(KERBEROS_PRINCIPAL);
        properties.add(KERBEROS_KEYTAB);
        properties.add(SSL_CONTEXT_SERVICE);
        properties.add(KAFKA_BOOTSTRAP_SERVERS);
        properties.add(KAFKA_SECURITY_PROTOCOL);
        properties.add(KAFKA_KERBEROS_SERVICE_NAME);
        properties.add(ATLAS_CONNECT_TIMEOUT);
        properties.add(ATLAS_READ_TIMEOUT);
        properties.add(AWS_S3_MODEL_VERSION);
        properties.add(FILESYSTEM_PATHS_LEVEL);
        return properties;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        for (NamespaceResolver resolver : this.namespaceResolverLoader) {
            PropertyDescriptor propertyDescriptor = resolver.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
            if (propertyDescriptor == null) continue;
            return propertyDescriptor;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Collection<ValidationResult> customValidate(ValidationContext context) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        HashSet schemes = new HashSet();
        String atlasUrls = context.getProperty(ATLAS_URLS).evaluateAttributeExpressions().getValue();
        if (!StringUtils.isEmpty((CharSequence)atlasUrls)) {
            Arrays.stream(atlasUrls.split(ATLAS_URL_DELIMITER)).map(String::trim).forEach(input -> {
                try {
                    URL url = new URL((String)input);
                    schemes.add(url.toURI().getScheme());
                }
                catch (Exception e) {
                    results.add(new ValidationResult.Builder().subject(ATLAS_URLS.getDisplayName()).input(input).explanation("contains invalid URI: " + e).valid(false).build());
                }
            });
        }
        if (schemes.size() > 1) {
            results.add(new ValidationResult.Builder().subject(ATLAS_URLS.getDisplayName()).explanation("URLs with multiple schemes have been specified").valid(false).build());
        }
        String atlasAuthNMethod = context.getProperty(ATLAS_AUTHN_METHOD).getValue();
        AtlasAuthN atlasAuthN = this.getAtlasAuthN(atlasAuthNMethod);
        results.addAll(atlasAuthN.validate(context));
        ServiceLoader<NamespaceResolver> serviceLoader = this.namespaceResolverLoader;
        synchronized (serviceLoader) {
            this.namespaceResolverLoader.forEach(resolver -> results.addAll(resolver.validate(context)));
        }
        if (context.getProperty(ATLAS_CONF_CREATE).asBoolean().booleanValue()) {
            Stream.of(ATLAS_URLS, ATLAS_CONF_DIR, ATLAS_DEFAULT_CLUSTER_NAME, KAFKA_BOOTSTRAP_SERVERS).filter(p -> !context.getProperty(p).isSet()).forEach(p -> results.add(new ValidationResult.Builder().subject(p.getDisplayName()).explanation("required to create Atlas configuration file.").valid(false).build()));
            this.validateKafkaProperties(context, results);
        }
        return results;
    }

    private void validateKafkaProperties(ValidationContext context, Collection<ValidationResult> results) {
        String kafkaSecurityProtocol = context.getProperty(KAFKA_SECURITY_PROTOCOL).getValue();
        if ((SEC_SSL.equals((Object)kafkaSecurityProtocol) || SEC_SASL_SSL.equals((Object)kafkaSecurityProtocol)) && !context.getProperty(SSL_CONTEXT_SERVICE).isSet()) {
            results.add(new ValidationResult.Builder().subject(SSL_CONTEXT_SERVICE.getDisplayName()).explanation("required by SSL Kafka connection").valid(false).build());
        }
        if (SEC_SASL_PLAINTEXT.equals((Object)kafkaSecurityProtocol) || SEC_SASL_SSL.equals((Object)kafkaSecurityProtocol)) {
            KerberosCredentialsService credentialsService = (KerberosCredentialsService)context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
            if (credentialsService == null || context.getControllerServiceLookup().isControllerServiceEnabled((ControllerService)credentialsService)) {
                String keytab;
                String principal;
                if (credentialsService == null) {
                    principal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
                    keytab = context.getProperty(KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
                } else {
                    principal = credentialsService.getPrincipal();
                    keytab = credentialsService.getKeytab();
                }
                if (keytab == null || principal == null) {
                    results.add(new ValidationResult.Builder().subject("Kerberos Authentication").explanation("Keytab and Principal are required for Kerberos authentication with Apache Kafka.").valid(false).build());
                }
            }
            if (!context.getProperty(KAFKA_KERBEROS_SERVICE_NAME).isSet()) {
                results.add(new ValidationResult.Builder().subject(KAFKA_KERBEROS_SERVICE_NAME.getDisplayName()).explanation("Required by Kafka SASL authentication.").valid(false).build());
            }
        }
    }

    @OnScheduled
    public void setup(ConfigurationContext context) throws Exception {
        this.initAtlasProperties(context);
        this.initLineageStrategy(context);
        this.initNamespaceResolvers(context);
    }

    private void initLineageStrategy(ConfigurationContext context) throws IOException {
        this.nifiAtlasHook = new NiFiAtlasHook();
        String strategy = context.getProperty(LINEAGE_STRATEGY).getValue();
        if (LINEAGE_STRATEGY_SIMPLE_PATH.equals((Object)strategy)) {
            this.lineageStrategy = new SimpleFlowPathLineage();
        } else if (LINEAGE_STRATEGY_COMPLETE_PATH.equals((Object)strategy)) {
            this.lineageStrategy = new CompleteFlowPathLineage();
        }
        this.lineageStrategy.setLineageContext(this.nifiAtlasHook);
        this.initProvenanceConsumer(context);
    }

    private void initNamespaceResolvers(ConfigurationContext context) {
        LinkedHashSet loadedNamespaceResolvers = new LinkedHashSet();
        this.namespaceResolverLoader.forEach(resolver -> {
            resolver.configure((PropertyContext)context);
            loadedNamespaceResolvers.add(resolver);
        });
        this.namespaceResolvers = new NamespaceResolvers(Collections.unmodifiableSet(loadedNamespaceResolvers), this.defaultMetadataNamespace);
    }

    private void initAtlasProperties(ConfigurationContext context) throws Exception {
        String atlasAuthNMethod = context.getProperty(ATLAS_AUTHN_METHOD).getValue();
        String confDirStr = context.getProperty(ATLAS_CONF_DIR).evaluateAttributeExpressions().getValue();
        File confDir = confDirStr != null && !confDirStr.isEmpty() ? new File(confDirStr) : null;
        this.atlasProperties = new Properties();
        File atlasPropertiesFile = new File(confDir, ATLAS_PROPERTIES_FILENAME);
        Boolean createAtlasConf = context.getProperty(ATLAS_CONF_CREATE).asBoolean();
        if (!createAtlasConf.booleanValue()) {
            if (atlasPropertiesFile.isFile()) {
                this.getLogger().info("Loading {}", new Object[]{atlasPropertiesFile});
                try (FileInputStream in = new FileInputStream(atlasPropertiesFile);){
                    this.atlasProperties.load(in);
                }
            }
            String fileInClasspath = "/atlas-application.properties";
            try (InputStream in = ReportLineageToAtlas.class.getResourceAsStream("/atlas-application.properties");){
                this.getLogger().info("Loading {} from classpath", new Object[]{"/atlas-application.properties"});
                if (in == null) {
                    throw new ProcessException(String.format("Could not find %s in classpath. Please add it to classpath, or specify %s a directory containing Atlas properties file, or enable %s to generate it.", "/atlas-application.properties", ATLAS_CONF_DIR.getDisplayName(), ATLAS_CONF_CREATE.getDisplayName()));
                }
                this.atlasProperties.load(in);
            }
        }
        List<String> urls = this.parseAtlasUrls(context.getProperty(ATLAS_URLS));
        this.setValue(value -> {
            this.defaultMetadataNamespace = value;
        }, () -> {
            throw new ProcessException("Default metadata namespace (or cluster name) is not defined.");
        }, context.getProperty(ATLAS_DEFAULT_CLUSTER_NAME), this.atlasProperties.getProperty(ATLAS_PROPERTY_METADATA_NAMESPACE), this.atlasProperties.getProperty(ATLAS_PROPERTY_CLUSTER_NAME));
        String atlasConnectTimeoutMs = context.getProperty(ATLAS_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue() + "";
        String atlasReadTimeoutMs = context.getProperty(ATLAS_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue() + "";
        this.atlasAuthN = this.getAtlasAuthN(atlasAuthNMethod);
        this.atlasAuthN.configure((PropertyContext)context);
        if (createAtlasConf.booleanValue()) {
            this.atlasProperties.setProperty("atlas.notification.hook.asynchronous", "false");
            this.atlasProperties.put(ATLAS_PROPERTY_REST_ADDRESS, urls.stream().collect(Collectors.joining(ATLAS_URL_DELIMITER)));
            this.atlasProperties.put(ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS, atlasConnectTimeoutMs);
            this.atlasProperties.put(ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS, atlasReadTimeoutMs);
            this.atlasProperties.put(ATLAS_PROPERTY_METADATA_NAMESPACE, this.defaultMetadataNamespace);
            this.atlasProperties.put(ATLAS_PROPERTY_CLUSTER_NAME, this.defaultMetadataNamespace);
            this.setAtlasSSLConfig(this.atlasProperties, context, urls, confDir);
            this.setKafkaConfig(this.atlasProperties, (PropertyContext)context);
            this.atlasAuthN.populateProperties(this.atlasProperties);
            try (FileOutputStream fos = new FileOutputStream(atlasPropertiesFile);){
                String ts = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX").withZone(ZoneOffset.UTC).format(Instant.now());
                this.atlasProperties.store(fos, "Generated by Apache NiFi ReportLineageToAtlas ReportingTask at " + ts);
            }
        } else {
            String isAsync = this.atlasProperties.getProperty("atlas.notification.hook.asynchronous");
            if (isAsync == null || !isAsync.equalsIgnoreCase("false")) {
                throw new ProcessException("Atlas property 'atlas.notification.hook.asynchronous' must be set to 'false' in atlas-application.properties. Sending notifications asynchronously is not supported by the reporting task.");
            }
        }
        this.getLogger().debug("Force reloading Atlas application properties.");
        ApplicationProperties.forceReload();
        if (confDir != null) {
            Properties props = System.getProperties();
            String atlasConfProp = "atlas.conf";
            props.setProperty("atlas.conf", confDir.getAbsolutePath());
            this.getLogger().debug("{} has been set to: {}", new Object[]{"atlas.conf", props.getProperty("atlas.conf")});
        }
    }

    private List<String> parseAtlasUrls(PropertyValue atlasUrlsProp) {
        ArrayList<String> atlasUrls = new ArrayList<String>();
        this.setValue(value -> Arrays.stream(value.split(ATLAS_URL_DELIMITER)).map(String::trim).forEach(urlString -> {
            try {
                new URL((String)urlString);
            }
            catch (Exception e) {
                throw new ProcessException((Throwable)e);
            }
            atlasUrls.add((String)urlString);
        }), () -> {
            throw new ProcessException("No Atlas URL has been specified! Set either the '" + ATLAS_URLS.getDisplayName() + "' property on the processor or the 'atlas.rest.address' property in the atlas configuration file.");
        }, atlasUrlsProp, this.atlasProperties.getProperty(ATLAS_PROPERTY_REST_ADDRESS));
        return atlasUrls;
    }

    private void setValue(Consumer<String> setter, Runnable emptyHandler, PropertyValue elEnabledPropertyValue, String ... properties) {
        StringSelector valueSelector = StringSelector.of((String[])new String[]{elEnabledPropertyValue.evaluateAttributeExpressions().getValue()}).or(properties);
        if (valueSelector.found()) {
            setter.accept(valueSelector.toString());
        } else {
            emptyHandler.run();
        }
    }

    private void setAtlasSSLConfig(Properties atlasProperties, ConfigurationContext context, List<String> urls, File confDir) throws Exception {
        boolean isAtlasApiSecure = urls.stream().anyMatch(url -> url.toLowerCase().startsWith("https"));
        atlasProperties.put(ATLAS_PROPERTY_ENABLE_TLS, String.valueOf(isAtlasApiSecure));
        this.deleteSslClientXml(confDir);
        if (isAtlasApiSecure) {
            SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
            if (sslContextService == null) {
                this.getLogger().warn("No SSLContextService configured, the system default truststore will be used.");
            } else if (!sslContextService.isTrustStoreConfigured()) {
                this.getLogger().warn("No truststore configured on SSLContextService, the system default truststore will be used.");
            } else {
                this.createSslClientXml(confDir, sslContextService);
            }
        }
    }

    private void deleteSslClientXml(File confDir) throws Exception {
        Path sslClientXmlPath = new File(confDir, SSL_CLIENT_XML_FILENAME).toPath();
        try {
            Files.deleteIfExists(sslClientXmlPath);
        }
        catch (Exception e) {
            this.getLogger().error("Unable to delete SSL Client Configuration File {}", new Object[]{sslClientXmlPath, e});
            throw e;
        }
    }

    private void createSslClientXml(File confDir, SSLContextService sslContextService) throws Exception {
        File sslClientXmlFile = new File(confDir, SSL_CLIENT_XML_FILENAME);
        Configuration configuration = new Configuration(false);
        configuration.set(SSL_CLIENT_XML_TRUSTSTORE_LOCATION, sslContextService.getTrustStoreFile());
        configuration.set(SSL_CLIENT_XML_TRUSTSTORE_PASSWORD, sslContextService.getTrustStorePassword());
        configuration.set(SSL_CLIENT_XML_TRUSTSTORE_TYPE, sslContextService.getTrustStoreType());
        try (FileWriter fileWriter = new FileWriter(sslClientXmlFile);){
            configuration.writeXml((Writer)fileWriter);
        }
        catch (Exception e) {
            this.getLogger().error("Unable to create SSL Client Configuration File {}", new Object[]{sslClientXmlFile, e});
            throw e;
        }
    }

    protected NiFiAtlasClient createNiFiAtlasClient(ReportingContext context) {
        List<String> urls = this.parseAtlasUrls(context.getProperty(ATLAS_URLS));
        try {
            return new NiFiAtlasClient(this.atlasAuthN.createClient(urls.toArray(new String[0])));
        }
        catch (NullPointerException e) {
            throw new ProcessException(String.format("Failed to initialize Atlas client due to %s. Make sure 'atlas-application.properties' is in the directory specified with %s or under root classpath if not specified.", e, ATLAS_CONF_DIR.getDisplayName()), (Throwable)e);
        }
    }

    private AtlasAuthN getAtlasAuthN(String atlasAuthNMethod) {
        AtlasAuthN atlasAuthN;
        switch (atlasAuthNMethod) {
            case "basic": {
                atlasAuthN = new Basic();
                break;
            }
            case "kerberos": {
                atlasAuthN = new Kerberos();
                break;
            }
            default: {
                throw new IllegalArgumentException(atlasAuthNMethod + " is not supported as an Atlas authentication method.");
            }
        }
        return atlasAuthN;
    }

    private void initProvenanceConsumer(ConfigurationContext context) throws IOException {
        this.consumer = new ProvenanceEventConsumer();
        this.consumer.setStartPositionValue(context.getProperty(ProvenanceEventConsumer.PROVENANCE_START_POSITION).getValue());
        this.consumer.setBatchSize(context.getProperty(ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE).asInteger().intValue());
        this.consumer.addTargetEventType(this.lineageStrategy.getTargetEventTypes());
        this.consumer.setLogger(this.getLogger());
        this.consumer.setScheduled(true);
    }

    @OnUnscheduled
    public void onUnscheduled() {
        if (this.consumer != null) {
            this.consumer.setScheduled(false);
        }
    }

    @OnStopped
    public void onStopped() {
        if (this.nifiAtlasHook != null) {
            this.nifiAtlasHook.close();
            this.nifiAtlasHook = null;
        }
    }

    public void onTrigger(ReportingContext context) {
        String clusterNodeId = context.getClusterNodeIdentifier();
        boolean isClustered = context.isClustered();
        if (isClustered && StringUtils.isEmpty((CharSequence)clusterNodeId)) {
            return;
        }
        boolean isResponsibleForPrimaryTasks = !isClustered || this.getNodeTypeProvider().isPrimary();
        try (NiFiAtlasClient atlasClient = this.createNiFiAtlasClient(context);){
            if (!this.isTypeDefCreated) {
                block24: {
                    try {
                        if (isResponsibleForPrimaryTasks) {
                            atlasClient.registerNiFiTypeDefs(false);
                            break block24;
                        }
                        if (atlasClient.isNiFiTypeDefsRegistered()) break block24;
                        this.getLogger().debug("NiFi type definitions are not ready in Atlas type system yet.");
                        return;
                    }
                    catch (AtlasServiceException e) {
                        throw new RuntimeException("Failed to check and create NiFi flow type definitions in Atlas due to " + (Object)((Object)e), e);
                    }
                }
                this.isTypeDefCreated = true;
            }
            NiFiFlow nifiFlow = this.createNiFiFlow(context, atlasClient);
            if (isResponsibleForPrimaryTasks) {
                try {
                    atlasClient.registerNiFiFlow(nifiFlow);
                }
                catch (AtlasServiceException e) {
                    throw new RuntimeException("Failed to register NiFI flow. " + (Object)((Object)e), e);
                }
            }
            this.nifiAtlasHook.setAtlasClient(atlasClient);
            this.consumeNiFiProvenanceEvents(context, nifiFlow);
        }
    }

    private NiFiFlow createNiFiFlow(ReportingContext context, NiFiAtlasClient atlasClient) {
        String namespace;
        ProcessGroupStatus rootProcessGroup = context.getEventAccess().getGroupStatus("root");
        String flowName = rootProcessGroup.getName();
        String nifiUrl = context.getProperty(ATLAS_NIFI_URL).evaluateAttributeExpressions().getValue();
        try {
            String nifiHostName = new URL(nifiUrl).getHost();
            namespace = this.namespaceResolvers.fromHostNames(nifiHostName);
        }
        catch (MalformedURLException e) {
            throw new IllegalArgumentException("Failed to parse NiFi URL, " + e.getMessage(), e);
        }
        NiFiFlow existingNiFiFlow = null;
        try {
            existingNiFiFlow = atlasClient.fetchNiFiFlow(rootProcessGroup.getId(), namespace);
        }
        catch (AtlasServiceException e) {
            if (ClientResponse.Status.NOT_FOUND.equals((Object)e.getStatus())) {
                this.getLogger().debug("Existing flow was not found for {}@{}", new Object[]{rootProcessGroup.getId(), namespace});
            }
            throw new RuntimeException("Failed to fetch existing NiFI flow. " + (Object)((Object)e), e);
        }
        NiFiFlow nifiFlow = existingNiFiFlow != null ? existingNiFiFlow : new NiFiFlow(rootProcessGroup.getId());
        nifiFlow.setFlowName(flowName);
        nifiFlow.setUrl(nifiUrl);
        nifiFlow.setNamespace(namespace);
        NiFiFlowAnalyzer flowAnalyzer = new NiFiFlowAnalyzer();
        flowAnalyzer.analyzeProcessGroup(nifiFlow, rootProcessGroup);
        flowAnalyzer.analyzePaths(nifiFlow);
        return nifiFlow;
    }

    private void consumeNiFiProvenanceEvents(ReportingContext context, NiFiFlow nifiFlow) {
        EventAccess eventAccess = context.getEventAccess();
        String awsS3ModelVersion = context.getProperty(AWS_S3_MODEL_VERSION).getValue();
        FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.valueOf(context.getProperty(FILESYSTEM_PATHS_LEVEL).getValue());
        StandardAnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, this.namespaceResolvers, (ProvenanceRepository)eventAccess.getProvenanceRepository(), awsS3ModelVersion, filesystemPathsLevel);
        this.consumer.consumeEvents(context, (componentMapHolder, events) -> {
            for (ProvenanceEventRecord event : events) {
                try {
                    this.lineageStrategy.processEvent(analysisContext, nifiFlow, event);
                }
                catch (Exception e) {
                    this.getLogger().error("Skipping failed analyzing event {} due to {}.", new Object[]{event, e, e});
                }
            }
            this.nifiAtlasHook.commitMessages();
        });
    }

    private void setKafkaConfig(Map<Object, Object> mapToPopulate, PropertyContext context) {
        String kafkaBootStrapServers = context.getProperty(KAFKA_BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
        mapToPopulate.put(ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS, kafkaBootStrapServers);
        mapToPopulate.put(ATLAS_PROPERTY_KAFKA_CLIENT_ID, String.format("%s.%s", this.getName(), this.getIdentifier()));
        String kafkaSecurityProtocol = context.getProperty(KAFKA_SECURITY_PROTOCOL).getValue();
        mapToPopulate.put("atlas.kafka.security.protocol", kafkaSecurityProtocol);
        SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (sslContextService != null && sslContextService.isKeyStoreConfigured()) {
            mapToPopulate.put("atlas.kafka.ssl.keystore.location", sslContextService.getKeyStoreFile());
            mapToPopulate.put("atlas.kafka.ssl.keystore.password", sslContextService.getKeyStorePassword());
            String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword();
            mapToPopulate.put("atlas.kafka.ssl.key.password", keyPass);
            mapToPopulate.put("atlas.kafka.ssl.keystore.type", sslContextService.getKeyStoreType());
        }
        if (sslContextService != null && sslContextService.isTrustStoreConfigured()) {
            mapToPopulate.put("atlas.kafka.ssl.truststore.location", sslContextService.getTrustStoreFile());
            mapToPopulate.put("atlas.kafka.ssl.truststore.password", sslContextService.getTrustStorePassword());
            mapToPopulate.put("atlas.kafka.ssl.truststore.type", sslContextService.getTrustStoreType());
        }
        if (SEC_SASL_PLAINTEXT.equals((Object)kafkaSecurityProtocol) || SEC_SASL_SSL.equals((Object)kafkaSecurityProtocol)) {
            this.setKafkaJaasConfig(mapToPopulate, context);
        }
    }

    private void setKafkaJaasConfig(Map<Object, Object> mapToPopulate, PropertyContext context) {
        String keytab;
        String principal;
        String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
        String explicitKeytab = context.getProperty(KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
        KerberosCredentialsService credentialsService = (KerberosCredentialsService)context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
        if (credentialsService == null) {
            principal = explicitPrincipal;
            keytab = explicitKeytab;
        } else {
            principal = credentialsService.getPrincipal();
            keytab = credentialsService.getKeytab();
        }
        String serviceName = context.getProperty(KAFKA_KERBEROS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
        if (StringUtils.isNotBlank((CharSequence)keytab) && StringUtils.isNotBlank((CharSequence)principal) && StringUtils.isNotBlank((CharSequence)serviceName)) {
            mapToPopulate.put("atlas.jaas.KafkaClient.loginModuleControlFlag", "required");
            mapToPopulate.put("atlas.jaas.KafkaClient.loginModuleName", "com.sun.security.auth.module.Krb5LoginModule");
            mapToPopulate.put("atlas.jaas.KafkaClient.option.keyTab", keytab);
            mapToPopulate.put("atlas.jaas.KafkaClient.option.principal", principal);
            mapToPopulate.put("atlas.jaas.KafkaClient.option.serviceName", serviceName);
            mapToPopulate.put("atlas.jaas.KafkaClient.option.storeKey", "True");
            mapToPopulate.put("atlas.jaas.KafkaClient.option.useKeyTab", "True");
            mapToPopulate.put("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag", "required");
            mapToPopulate.put("atlas.jaas.ticketBased-KafkaClient.loginModuleName", "com.sun.security.auth.module.Krb5LoginModule");
            mapToPopulate.put("atlas.jaas.ticketBased-KafkaClient.option.useTicketCache", "true");
            mapToPopulate.put("atlas.kafka.sasl.kerberos.service.name", serviceName);
        }
    }
}

