package org.apache.nifi.processors.gcp.pubsub;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.pathtemplate.ValidationException;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.iam.v1.TestIamPermissionsRequest;
import com.google.iam.v1.TestIamPermissionsResponse;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Consumes message from the configured Google Cloud PubSub subscription. If the 'Batch Size' is set, the configured number of messages will be pulled in a single request, else only one message will be pulled.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@SeeAlso({PublishGCPubSub.class})
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume"})
@WritesAttributes({@WritesAttribute(attribute = PubSubAttributes.ACK_ID_ATTRIBUTE, description = PubSubAttributes.ACK_ID_DESCRIPTION), @WritesAttribute(attribute = PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE, description = PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION), @WritesAttribute(attribute = PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION), @WritesAttribute(attribute = PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE, description = PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION), @WritesAttribute(attribute = PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION)})
/* loaded from: input_file:org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.class */
public class ConsumeGCPubSub extends AbstractGCPubSubWithProxyProcessor {
    private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.subscriptions.consume");
    public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder().name("gcp-pubsub-subscription").displayName("Subscription").addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).description("Name of the Google Cloud Pub/Sub Subscription").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    private PullRequest pullRequest;
    private SubscriberStub subscriber = null;
    private final AtomicReference<Exception> storedException = new AtomicReference<>();

    @Override // org.apache.nifi.processors.gcp.AbstractGCPProcessor
    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        this.pullRequest = PullRequest.newBuilder().setMaxMessages(processContext.getProperty(BATCH_SIZE_THRESHOLD).asInteger().intValue()).setSubscription(getSubscriptionName(processContext, null)).build();
        try {
            this.subscriber = getSubscriber(processContext);
        } catch (IOException e) {
            this.storedException.set(e);
            getLogger().error("Failed to create Google Cloud Subscriber", e);
        }
    }

    public List<ConfigVerificationResult> verify(ProcessContext processContext, ComponentLog componentLog, Map<String, String> map) {
        ArrayList arrayList = new ArrayList();
        String str = null;
        try {
            str = getSubscriptionName(processContext, map);
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Parse Subscription Name").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully parsed Subscription Name").build());
        } catch (ValidationException e) {
            componentLog.error("Failed to parse Subscription Name", e);
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Parse Subscription Name").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Failed to parse Subscription Name: " + e.getMessage(), new Object[0])).build());
        }
        SubscriberStub subscriberStub = null;
        try {
            subscriberStub = getSubscriber(processContext);
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Create Subscriber").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully created Subscriber").build());
        } catch (IOException e2) {
            componentLog.error("Failed to create Subscriber", e2);
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Create Subscriber").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Failed to create Subscriber: " + e2.getMessage(), new Object[0])).build());
        }
        if (subscriberStub != null && str != null) {
            try {
                if (((TestIamPermissionsResponse) subscriberStub.testIamPermissionsCallable().call(TestIamPermissionsRequest.newBuilder().addAllPermissions(REQUIRED_PERMISSIONS).setResource(str).build())).getPermissionsCount() >= REQUIRED_PERMISSIONS.size()) {
                    arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(String.format("Verified Subscription [%s] exists and the configured user has the correct permissions.", str)).build());
                } else {
                    arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("The configured user does not have the correct permissions on Subscription [%s].", str)).build());
                }
            } catch (ApiException e3) {
                componentLog.error("The configured user appears to have the correct permissions, but the following error was encountered", e3);
                arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("The configured user appears to have the correct permissions, but the following error was encountered: " + e3.getMessage(), new Object[0])).build());
            }
        }
        return arrayList;
    }

    @OnStopped
    public void onStopped() {
        if (this.subscriber != null) {
            this.subscriber.shutdown();
        }
    }

    @Override // org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubWithProxyProcessor, org.apache.nifi.processors.gcp.AbstractGCPProcessor
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList(super.getSupportedPropertyDescriptors());
        arrayList.add(SUBSCRIPTION);
        arrayList.add(BATCH_SIZE_THRESHOLD);
        arrayList.add(API_ENDPOINT);
        return Collections.unmodifiableList(arrayList);
    }

    @Override // org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor
    public Set<Relationship> getRelationships() {
        return Collections.singleton(REL_SUCCESS);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        if (this.subscriber == null) {
            if (this.storedException.get() != null) {
                getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", this.storedException.get());
            } else {
                getLogger().error("Google Cloud PubSub Subscriber was not properly created. Yielding the processor...");
            }
            processContext.yield();
            return;
        }
        PullResponse pullResponse = (PullResponse) this.subscriber.pullCallable().call(this.pullRequest);
        ArrayList arrayList = new ArrayList();
        String subscriptionName = getSubscriptionName(processContext, null);
        for (ReceivedMessage receivedMessage : pullResponse.getReceivedMessagesList()) {
            if (receivedMessage.hasMessage()) {
                FlowFile create = processSession.create();
                HashMap hashMap = new HashMap();
                arrayList.add(receivedMessage.getAckId());
                hashMap.put(PubSubAttributes.ACK_ID_ATTRIBUTE, receivedMessage.getAckId());
                hashMap.put(PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE, String.valueOf(receivedMessage.getSerializedSize()));
                hashMap.put(PubSubAttributes.MESSAGE_ID_ATTRIBUTE, receivedMessage.getMessage().getMessageId());
                hashMap.put(PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE, String.valueOf(receivedMessage.getMessage().getAttributesCount()));
                hashMap.put(PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE, String.valueOf(receivedMessage.getMessage().getPublishTime().getSeconds()));
                hashMap.putAll(receivedMessage.getMessage().getAttributesMap());
                FlowFile write = processSession.write(processSession.putAllAttributes(create, hashMap), outputStream -> {
                    outputStream.write(receivedMessage.getMessage().getData().toByteArray());
                });
                processSession.transfer(write, REL_SUCCESS);
                processSession.getProvenanceReporter().receive(write, subscriptionName);
            }
        }
        processSession.commitAsync(() -> {
            acknowledgeAcks(arrayList, subscriptionName);
        });
    }

    private void acknowledgeAcks(Collection<String> collection, String str) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        this.subscriber.acknowledgeCallable().call(AcknowledgeRequest.newBuilder().addAllAckIds(collection).setSubscription(str).build());
    }

    private String getSubscriptionName(ProcessContext processContext, Map<String, String> map) {
        String value = processContext.getProperty(SUBSCRIPTION).evaluateAttributeExpressions(map).getValue();
        return value.contains("/") ? ProjectSubscriptionName.parse(value).toString() : ProjectSubscriptionName.of(processContext.getProperty(PROJECT_ID).evaluateAttributeExpressions(map).getValue(), value).toString();
    }

    private SubscriberStub getSubscriber(ProcessContext processContext) throws IOException {
        return GrpcSubscriberStub.create(SubscriberStubSettings.newBuilder().setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(processContext))).setTransportChannelProvider(getTransportChannelProvider(processContext)).setEndpoint(processContext.getProperty(API_ENDPOINT).getValue()).build());
    }
}
