package org.apache.nifi.processors.gcp.pubsub.lite;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
import org.apache.nifi.processors.gcp.pubsub.PubSubAttributes;

@CapabilityDescription("Consumes message from the configured Google Cloud PubSub Lite subscription. In its current state, this processor will only work if running on a Google Cloud Compute Engine instance and if using the GCP Credentials Controller Service with 'Use Application Default Credentials' or 'Use Compute Engine Credentials'.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@SeeAlso({PublishGCPubSubLite.class})
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume", "lite"})
@WritesAttributes({@WritesAttribute(attribute = PubSubAttributes.MESSAGE_ID_ATTRIBUTE, description = PubSubAttributes.MESSAGE_ID_DESCRIPTION), @WritesAttribute(attribute = PubSubAttributes.ORDERING_KEY_ATTRIBUTE, description = PubSubAttributes.ORDERING_KEY_DESCRIPTION), @WritesAttribute(attribute = PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION), @WritesAttribute(attribute = PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE, description = PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION), @WritesAttribute(attribute = PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION)})
/* loaded from: input_file:org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.class */
public class ConsumeGCPubSubLite extends AbstractGCPubSubProcessor implements VerifiableProcessor {
    public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder().name("gcp-pubsub-subscription").displayName("Subscription").addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).description("Name of the Google Cloud Pub/Sub Subscription. Example: projects/8476107443/locations/europe-west1-d/subscriptions/my-lite-subscription").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor BYTES_OUTSTANDING = new PropertyDescriptor.Builder().name("gcp-bytes-outstanding").displayName("Bytes Outstanding").description("The number of quota bytes that may be outstanding to the client.").required(true).defaultValue("10 MB").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor MESSAGES_OUTSTANDING = new PropertyDescriptor.Builder().name("gcp-messages-outstanding").displayName("Messages Outstanding").description("The number of messages that may be outstanding to the client.").required(true).defaultValue("1000").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    private Subscriber subscriber = null;
    private BlockingQueue<Message> messages = new LinkedBlockingQueue();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite$Message.class */
    public class Message {
        private PubsubMessage message;
        private AckReplyConsumer consumer;

        public Message(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
            this.message = pubsubMessage;
            this.consumer = ackReplyConsumer;
        }

        public PubsubMessage getMessage() {
            return this.message;
        }

        public AckReplyConsumer getConsumer() {
            return this.consumer;
        }
    }

    @Override // org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor
    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList(1);
        String value = validationContext.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
        try {
            SubscriptionPath.parse(value);
        } catch (ApiException e) {
            arrayList.add(new ValidationResult.Builder().subject(SUBSCRIPTION.getName()).input(value).valid(false).explanation("The Suscription does not have a valid format.").build());
        }
        return arrayList;
    }

    @Override // org.apache.nifi.processors.gcp.AbstractGCPProcessor
    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        try {
            if (this.subscriber == null) {
                this.subscriber = getSubscriber(processContext);
            }
            try {
                this.subscriber.startAsync().awaitRunning();
            } catch (Exception e) {
                getLogger().error("Failed to create Google Cloud PubSub Lite Subscriber", this.subscriber.failureCause());
                throw new ProcessException(e);
            }
        } catch (Exception e2) {
            getLogger().error("Failed to create Google Cloud PubSub Lite Subscriber", e2);
            throw new ProcessException(e2);
        }
    }

    @OnStopped
    public void onStopped() {
        try {
            if (this.subscriber != null) {
                this.subscriber.stopAsync().awaitTerminated();
                this.subscriber = null;
            }
        } catch (Exception e) {
            getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Lite Subscriber", e);
        }
    }

    @Override // org.apache.nifi.processors.gcp.AbstractGCPProcessor
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return Collections.unmodifiableList(Arrays.asList(SUBSCRIPTION, GCP_CREDENTIALS_PROVIDER_SERVICE, BYTES_OUTSTANDING, MESSAGES_OUTSTANDING));
    }

    @Override // org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor
    public Set<Relationship> getRelationships() {
        return Collections.singleton(REL_SUCCESS);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        if (this.subscriber == null) {
            getLogger().error("Google Cloud PubSub Lite Subscriber was not properly created. Yielding the processor...");
            processContext.yield();
            return;
        }
        if (!this.subscriber.isRunning()) {
            getLogger().error("Google Cloud PubSub Lite Subscriber is not running. Yielding the processor...", this.subscriber.failureCause());
            throw new ProcessException(this.subscriber.failureCause());
        }
        Message poll = this.messages.poll();
        if (poll == null) {
            processContext.yield();
            return;
        }
        FlowFile create = processSession.create();
        HashMap hashMap = new HashMap();
        hashMap.put(PubSubAttributes.MESSAGE_ID_ATTRIBUTE, poll.getMessage().getMessageId());
        hashMap.put(PubSubAttributes.ORDERING_KEY_ATTRIBUTE, poll.getMessage().getOrderingKey());
        hashMap.put(PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE, String.valueOf(poll.getMessage().getAttributesCount()));
        hashMap.put(PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE, String.valueOf(poll.getMessage().getPublishTime().getSeconds()));
        hashMap.putAll(poll.getMessage().getAttributesMap());
        FlowFile write = processSession.write(processSession.putAllAttributes(create, hashMap), outputStream -> {
            outputStream.write(poll.getMessage().getData().toStringUtf8().getBytes());
        });
        processSession.transfer(write, REL_SUCCESS);
        processSession.getProvenanceReporter().receive(write, processContext.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
        poll.getConsumer().ack();
    }

    private Subscriber getSubscriber(ProcessContext processContext) throws IOException {
        SubscriptionPath parse = SubscriptionPath.parse(processContext.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
        return Subscriber.create(SubscriberSettings.newBuilder().setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(processContext))).setSubscriptionPath(parse).setReceiver((pubsubMessage, ackReplyConsumer) -> {
            try {
                this.messages.put(new Message(pubsubMessage, ackReplyConsumer));
            } catch (InterruptedException e) {
                getLogger().error("Could not save the message inside the internal queue of the processor", e);
            }
        }).setPerPartitionFlowControlSettings(FlowControlSettings.builder().setBytesOutstanding(processContext.getProperty(BYTES_OUTSTANDING).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue()).setMessagesOutstanding(processContext.getProperty(MESSAGES_OUTSTANDING).evaluateAttributeExpressions().asLong().longValue()).build()).build());
    }

    public List<ConfigVerificationResult> verify(ProcessContext processContext, ComponentLog componentLog, Map<String, String> map) {
        ArrayList arrayList = new ArrayList();
        try {
            getSubscriber(processContext);
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Create the Subscriber").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully created the Google Cloud PubSub Lite Subscriber").build());
        } catch (Exception e) {
            componentLog.error("Failed to create Google Cloud PubSub Lite Subscriber", e);
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Create the Subscriber").outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Failed to create Google Cloud PubSub Lite Subscriber: " + e.getLocalizedMessage()).build());
        }
        return arrayList;
    }
}
