package io.dstream.nifi;

import io.dstream.DStream;
import io.dstream.utils.Assert;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

@EventDriven
/* loaded from: input_file:io/dstream/nifi/AbstractDStreamProcessor.class */
public abstract class AbstractDStreamProcessor extends AbstractProcessor {
    public static final Relationship OUTPUT = new Relationship.Builder().name("success").description("Upon successfull completion of DStream execution, its output path is forwarded to success").build();
    public static final PropertyDescriptor EXECUTION_COMPLETION_TIMEOUT = new PropertyDescriptor.Builder().name("Execution completion timeout (milliseconds)").description("Indicates how long to wait for completion of DStream execution. Defaults to 0 (wait indefinitely).").addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).defaultValue("0").required(true).build();
    private volatile Set<Relationship> relationships;
    private volatile List<PropertyDescriptor> properties;

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        ProcessorLog logger = getLogger();
        long parseLong = Long.parseLong(processContext.getProperty(EXECUTION_COMPLETION_TIMEOUT).getValue());
        FlowFile flowFile = processSession.get();
        try {
            if (flowFile != null) {
                try {
                    final String attribute = flowFile.getAttribute("filename");
                    Assert.isTrue(attribute.endsWith(".cfg"), "Received invalid configuration file '" + attribute + "'. DStream configuration file must end with '.cfg'.");
                    logger.info("Recieved configuration '" + attribute + "'");
                    final AtomicReference atomicReference = new AtomicReference();
                    processSession.read(flowFile, new InputStreamCallback() { // from class: io.dstream.nifi.AbstractDStreamProcessor.1
                        public void process(InputStream inputStream) throws IOException {
                            atomicReference.set(AbstractDStreamProcessor.this.installConfiguration(attribute, inputStream));
                        }
                    });
                    String str = attribute.split("\\.")[0];
                    DStream<?> dStream = getDStream(str);
                    if (dStream != null) {
                        logger.info("Executing DStream for '" + str + "'");
                        postProcessResults(executeDStream(dStream, str, parseLong));
                        FlowFile putAttribute = processSession.putAttribute(processSession.create(), CoreAttributes.FILENAME.key(), (String) atomicReference.get());
                        processSession.getProvenanceReporter().receive(putAttribute, (String) atomicReference.get());
                        processSession.transfer(putAttribute, OUTPUT);
                    } else {
                        logger.warn("Failed to locate DStream for execution '" + str + "'. Nothing was executed. Possible reasons: " + getClass().getSimpleName() + " may not have provided a DStream for '" + str + "'");
                    }
                } catch (Exception e) {
                    throw new IllegalStateException("Failed DStream execution with unexpected exception ", e);
                }
            }
        } finally {
            processSession.remove(flowFile);
            processSession.commit();
        }
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        HashSet hashSet = new HashSet();
        hashSet.add(OUTPUT);
        this.relationships = Collections.unmodifiableSet(hashSet);
        ArrayList arrayList = new ArrayList();
        arrayList.add(EXECUTION_COMPLETION_TIMEOUT);
        this.properties = Collections.unmodifiableList(arrayList);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    protected abstract <T> DStream<T> getDStream(String str);

    protected <T> void postProcessResults(Stream<Stream<T>> stream) {
    }

    private <T> Stream<Stream<T>> executeDStream(DStream<?> dStream, String str, long j) {
        ProcessorLog logger = getLogger();
        Future executeAs = dStream.executeAs(str);
        try {
            if (j > 0) {
                return (Stream) executeAs.get(j, TimeUnit.MILLISECONDS);
            }
            logger.warn("Waiting for completion of '" + str + "' indefinitely. Consider setting 'Execution completion timeout' property of your processorwhen configured via UI.");
            return (Stream) executeAs.get();
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new IllegalStateException("Failed while waiting for execution to complete", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String installConfiguration(String str, InputStream inputStream) {
        try {
            File file = new File(System.getProperty("java.io.tmpdir") + "/dstream_" + UUID.randomUUID());
            file.mkdirs();
            File file2 = new File(file, str);
            file2.deleteOnExit();
            FileOutputStream fileOutputStream = new FileOutputStream(file2);
            Properties properties = new Properties();
            properties.load(inputStream);
            properties.store(fileOutputStream, str + " configuration");
            addToClassPath(file);
            return properties.containsKey("dstream.output") ? properties.getProperty("dstream.output") : str.split("\\.")[0] + "/out";
        } catch (Exception e) {
            throw new IllegalStateException("Failed to generate execution config", e);
        }
    }

    private void addToClassPath(File file) {
        try {
            Thread.currentThread().setContextClassLoader(URLClassLoader.newInstance(new URL[]{file.toURI().toURL()}, Thread.currentThread().getContextClassLoader()));
        } catch (Exception e) {
            throw new IllegalStateException("Failed to update classpath with path '" + file + "'.", e);
        }
    }
}
