package org.apache.nifi.processors.flume;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Set;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.conf.Configurables;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Execute a Flume sink. Each input FlowFile is converted into a Flume Event for processing by the sink.")
@Restricted(restrictions = {@Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE, explanation = "Provides operator the ability to execute arbitrary Flume configurations assuming all permissions that NiFi has.")})
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"flume", "hadoop", "put", "sink"})
@DeprecationNotice(reason = "Apache Flume pipelines should be implemented using Apache NiFi components")
/* loaded from: input_file:org/apache/nifi/processors/flume/ExecuteFlumeSink.class */
public class ExecuteFlumeSink extends AbstractFlumeProcessor {
    public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder().name("Sink Type").description("The component type name for the sink. For some sinks, this is a short, symbolic name (e.g. hdfs). For others, it's the fully-qualified name of the Sink class. See the Flume User Guide for details.").required(true).addValidator(createSinkValidator()).build();
    public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder().name("Agent Name").description("The name of the agent used in the Flume sink configuration").required(true).defaultValue("tier1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder().name("Sink Name").description("The name of the sink used in the Flume sink configuration").required(true).defaultValue("sink-1").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder().name("Flume Configuration").description("The Flume configuration for the sink copied from the flume.properties file").required(true).defaultValue("").addValidator(Validator.VALID).build();
    public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
    public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
    private List<PropertyDescriptor> descriptors;
    private Set<Relationship> relationships;
    private volatile Sink sink;
    private volatile NifiSinkSessionChannel channel;

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
        this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        try {
            this.channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
            this.channel.start();
            this.sink = SINK_FACTORY.create(processContext.getProperty(SOURCE_NAME).getValue(), processContext.getProperty(SINK_TYPE).getValue());
            this.sink.setChannel(this.channel);
            Configurables.configure(this.sink, getFlumeSinkContext(processContext.getProperty(FLUME_CONFIG).getValue(), processContext.getProperty(AGENT_NAME).getValue(), processContext.getProperty(SOURCE_NAME).getValue()));
            this.sink.start();
        } catch (Throwable th) {
            getLogger().error("Error creating sink", th);
            throw Throwables.propagate(th);
        }
    }

    @OnStopped
    public void stopped() {
        this.sink.stop();
        this.channel.stop();
    }

    @Override // org.apache.nifi.processors.flume.AbstractFlumeProcessor
    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        this.channel.setSession(processSession);
        try {
            this.sink.process();
        } catch (EventDeliveryException e) {
            throw new ProcessException("Flume event delivery failed", e);
        }
    }
}
