package org.apache.nifi.processors.flume;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.PollableSource;
import org.apache.flume.Source;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.EventDrivenSourceRunner;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Execute a Flume source. Each Flume Event is sent to the success relationship as a FlowFile")
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"flume", "hadoop", "get", "source"})
/* loaded from: input_file:org/apache/nifi/processors/flume/ExecuteFlumeSource.class */
public class ExecuteFlumeSource extends AbstractFlumeProcessor {
    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder().name("Source Type").description("The component type name for the source. For some sources, this is a short, symbolic name (e.g. spooldir). For others, it's the fully-qualified name of the Source class. See the Flume User Guide for details.").required(true).addValidator(createSourceValidator()).build();
    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder().name("Agent Name").description("The name of the agent used in the Flume source configuration").required(true).defaultValue("tier1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder().name("Source Name").description("The name of the source used in the Flume source configuration").required(true).defaultValue("src-1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder().name("Flume Configuration").description("The Flume configuration for the source copied from the flume.properties file").required(true).defaultValue("").addValidator(Validator.VALID).build();
    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    private List<PropertyDescriptor> descriptors;
    private Set<Relationship> relationships;
    private volatile Source source;
    private final NifiSessionChannel pollableSourceChannel = new NifiSessionChannel(SUCCESS);
    private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>(null);
    private final AtomicReference<EventDrivenSourceRunner> runnerRef = new AtomicReference<>(null);
    private final AtomicReference<NifiSessionFactoryChannel> eventDrivenSourceChannelRef = new AtomicReference<>(null);

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
        this.relationships = ImmutableSet.of(SUCCESS);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        try {
            this.source = SOURCE_FACTORY.create(processContext.getProperty(SOURCE_NAME).getValue(), processContext.getProperty(SOURCE_TYPE).getValue());
            Configurables.configure(this.source, getFlumeSourceContext(processContext.getProperty(FLUME_CONFIG).getValue(), processContext.getProperty(AGENT_NAME).getValue(), processContext.getProperty(SOURCE_NAME).getValue()));
            if (this.source instanceof PollableSource) {
                this.source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(this.pollableSourceChannel)));
                this.source.start();
            }
        } catch (Throwable th) {
            getLogger().error("Error creating source", th);
            throw Throwables.propagate(th);
        }
    }

    @OnStopped
    public void stopped() {
        if (this.source instanceof PollableSource) {
            this.source.stop();
        } else {
            EventDrivenSourceRunner eventDrivenSourceRunner = this.runnerRef.get();
            if (eventDrivenSourceRunner != null) {
                eventDrivenSourceRunner.stop();
                this.runnerRef.compareAndSet(eventDrivenSourceRunner, null);
            }
            NifiSessionFactoryChannel nifiSessionFactoryChannel = this.eventDrivenSourceChannelRef.get();
            if (nifiSessionFactoryChannel != null) {
                nifiSessionFactoryChannel.stop();
                this.eventDrivenSourceChannelRef.compareAndSet(nifiSessionFactoryChannel, null);
            }
        }
        this.sessionFactoryRef.set(null);
    }

    @Override // org.apache.nifi.processors.flume.AbstractFlumeProcessor
    public void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException {
        if (this.source instanceof PollableSource) {
            super.onTrigger(processContext, processSessionFactory);
            return;
        }
        if (!(this.source instanceof EventDrivenSource) || this.sessionFactoryRef.getAndSet(processSessionFactory) == processSessionFactory) {
            return;
        }
        if (this.runnerRef.get() != null) {
            stopped();
            this.sessionFactoryRef.set(processSessionFactory);
        }
        this.runnerRef.set(new EventDrivenSourceRunner());
        this.eventDrivenSourceChannelRef.set(new NifiSessionFactoryChannel(this.sessionFactoryRef.get(), SUCCESS));
        this.eventDrivenSourceChannelRef.get().start();
        this.source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(this.eventDrivenSourceChannelRef.get())));
        this.runnerRef.get().setSource(this.source);
        this.runnerRef.get().start();
    }

    @Override // org.apache.nifi.processors.flume.AbstractFlumeProcessor
    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        if (!(this.source instanceof PollableSource)) {
            throw new ProcessException("Invalid source type: " + this.source.getClass().getSimpleName());
        }
        PollableSource pollableSource = this.source;
        try {
            this.pollableSourceChannel.setSession(processSession);
            pollableSource.process();
        } catch (EventDeliveryException e) {
            throw new ProcessException("Error processing pollable source", e);
        }
    }
}
