package org.apache.nifi.processors.kafka.pubsub;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.class */
abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessionFactoryProcessor {
    private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
    volatile T kafkaResource;
    static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
    static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
    static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
    static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL");
    private static final String BROKER_REGEX = ".*?\\:\\d{3,5}(?:,\\s*.*?\\:\\d{3,5})*";
    static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder().name("bootstrap.servers").displayName("Kafka Brokers").description("A comma-separated list of known Kafka Brokers in the format <host>:<port>").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))).expressionLanguageSupported(true).defaultValue("localhost:9092").build();
    static final PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder().name("client.id").displayName("Client ID").description("String value uniquely identifying this client application. Corresponds to Kafka's 'client.id' property.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(true).build();
    static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder().name("security.protocol").displayName("Security Protocol").description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.").required(false).expressionLanguageSupported(false).allowableValues(new AllowableValue[]{SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL}).defaultValue(SEC_PLAINTEXT.getValue()).build();
    static final PropertyDescriptor KERBEROS_PRINCIPLE = new PropertyDescriptor.Builder().name("sasl.kerberos.service.name").displayName("Kerberos Service Name").description("The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. Corresponds to Kafka's 'security.protocol' property.It is ignored unless one of the SASL options of the <Security Protocol> are selected.").required(false).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(false).build();
    static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder().name("topic").displayName("Topic Name").description("The name of the Kafka Topic").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(true).build();
    static final PropertyDescriptor.Builder MESSAGE_DEMARCATOR_BUILDER = new PropertyDescriptor.Builder().name("message-demarcator").displayName("Message Demarcator").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true);
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are the are successfully sent to or received from Kafka are routed to this relationship").build();
    static final List<PropertyDescriptor> SHARED_DESCRIPTORS = new ArrayList();
    static final Set<Relationship> SHARED_RELATIONSHIPS = new HashSet();
    final Logger logger = LoggerFactory.getLogger(getClass());
    private final AtomicInteger taskCounter = new AtomicInteger();
    private volatile boolean acceptTask = true;

    public final void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException {
        if (!this.acceptTask) {
            this.logger.debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset.");
            getLogger().debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset.");
            processContext.yield();
            return;
        }
        this.taskCounter.incrementAndGet();
        ProcessSession createSession = processSessionFactory.createSession();
        try {
            try {
                synchronized (this) {
                    if (this.kafkaResource == null) {
                        this.kafkaResource = buildKafkaResource(processContext, createSession);
                    }
                }
                boolean rendezvousWithKafka = rendezvousWithKafka(processContext, createSession);
                createSession.commit();
                if (rendezvousWithKafka) {
                    postCommit(processContext);
                } else {
                    processContext.yield();
                }
                synchronized (this) {
                    if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) {
                        close();
                        this.acceptTask = true;
                    }
                }
            } catch (Throwable th) {
                this.acceptTask = false;
                createSession.rollback(true);
                getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, th});
                synchronized (this) {
                    if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) {
                        close();
                        this.acceptTask = true;
                    }
                }
            }
        } catch (Throwable th2) {
            synchronized (this) {
                if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) {
                    close();
                    this.acceptTask = true;
                }
                throw th2;
            }
        }
    }

    @OnStopped
    public void close() {
        try {
            if (this.kafkaResource != null) {
                try {
                    this.kafkaResource.close();
                } catch (Exception e) {
                    getLogger().warn("Failed while closing " + this.kafkaResource, e);
                }
            }
        } finally {
            this.kafkaResource = null;
        }
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().description("Specifies the value for '" + str + "' Kafka Configuration.").name(str).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true).build();
    }

    protected abstract boolean rendezvousWithKafka(ProcessContext processContext, ProcessSession processSession);

    protected abstract T buildKafkaResource(ProcessContext processContext, ProcessSession processSession);

    protected void postCommit(ProcessContext processContext) {
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        String value;
        ArrayList arrayList = new ArrayList();
        String value2 = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
        if ((SEC_SASL_PLAINTEXT.getValue().equals(value2) || SEC_SASL_SSL.getValue().equals(value2)) && ((value = validationContext.getProperty(KERBEROS_PRINCIPLE).getValue()) == null || value.trim().length() == 0)) {
            arrayList.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false).explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <" + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '" + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.").build());
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String buildTransitURI(String str, String str2, String str3) {
        return str + "://" + str2 + "/" + str3;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Properties buildKafkaProperties(ProcessContext processContext) {
        Properties properties = new Properties();
        for (PropertyDescriptor propertyDescriptor : processContext.getProperties().keySet()) {
            String name = propertyDescriptor.getName();
            String value = propertyDescriptor.isExpressionLanguageSupported() ? processContext.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue() : processContext.getProperty(propertyDescriptor).getValue();
            if (value != null) {
                if (name.endsWith(".ms")) {
                    value = String.valueOf(FormatUtils.getTimeDuration(value.trim(), TimeUnit.MILLISECONDS));
                }
                properties.setProperty(name, value);
            }
        }
        return properties;
    }

    static {
        SHARED_DESCRIPTORS.add(BOOTSTRAP_SERVERS);
        SHARED_DESCRIPTORS.add(TOPIC);
        SHARED_DESCRIPTORS.add(CLIENT_ID);
        SHARED_DESCRIPTORS.add(SECURITY_PROTOCOL);
        SHARED_DESCRIPTORS.add(KERBEROS_PRINCIPLE);
        SHARED_RELATIONSHIPS.add(REL_SUCCESS);
    }
}
