package org.apache.nifi.processors.gcp.pubsub;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Publishes the content of the incoming flowfile to the configured Google Cloud PubSub topic. The processor supports dynamic properties. If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'.")
@DynamicProperty(name = "Attribute name", value = "Value to be set to the attribute", description = "Attributes to be set for the outgoing Google Cloud PubSub message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content will be read into memory to be sent as a PubSub message.")
@WritesAttributes({@WritesAttribute(attribute = PubSubAttributes.MESSAGE_ID_ATTRIBUTE, description = PubSubAttributes.MESSAGE_ID_DESCRIPTION), @WritesAttribute(attribute = PubSubAttributes.TOPIC_NAME_ATTRIBUTE, description = PubSubAttributes.TOPIC_NAME_DESCRIPTION)})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SeeAlso({ConsumeGCPubSub.class})
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish"})
/* loaded from: input_file:org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.class */
public class PublishGCPubSub extends AbstractGCPubSubProcessor {
    public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder().name("gcp-pubsub-topic").displayName("Topic Name").description("Name of the Google Cloud PubSub Topic").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("FlowFiles are routed to this relationship if the Google Cloud Pub/Sub operation fails but attempting the operation again may succeed.").build();
    private Publisher publisher = null;
    private AtomicReference<Exception> storedException = new AtomicReference<>();

    @Override // org.apache.nifi.processors.gcp.AbstractGCPProcessor
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return ImmutableList.of(PROJECT_ID, GCP_CREDENTIALS_PROVIDER_SERVICE, TOPIC_NAME, BATCH_SIZE);
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().required(false).name(str).displayName(str).addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR).dynamic(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    }

    @Override // org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor
    public Set<Relationship> getRelationships() {
        return Collections.unmodifiableSet(new HashSet(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_RETRY)));
    }

    @Override // org.apache.nifi.processors.gcp.AbstractGCPProcessor
    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        try {
            this.publisher = getPublisherBuilder(processContext).build();
        } catch (IOException e) {
            getLogger().error("Failed to create Google Cloud PubSub Publisher due to {}", new Object[]{e});
            this.storedException.set(e);
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        List<FlowFile> list = processSession.get(processContext.getProperty(BATCH_SIZE).asInteger().intValue());
        if (list.isEmpty() || this.publisher == null) {
            if (this.storedException.get() != null) {
                getLogger().error("Google Cloud PubSub Publisher was not properly created due to {}", new Object[]{this.storedException.get()});
            }
            processContext.yield();
            return;
        }
        long nanoTime = System.nanoTime();
        ArrayList arrayList = new ArrayList();
        String projectTopicName = getTopicName(processContext).toString();
        try {
            for (FlowFile flowFile : list) {
                try {
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    processSession.exportTo(flowFile, byteArrayOutputStream);
                    ApiFuture publish = this.publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFrom(byteArrayOutputStream.toByteArray())).setPublishTime(Timestamp.newBuilder().build()).putAllAttributes(getDynamicAttributesMap(processContext, flowFile)).build());
                    HashMap hashMap = new HashMap();
                    hashMap.put(PubSubAttributes.MESSAGE_ID_ATTRIBUTE, publish.get());
                    hashMap.put(PubSubAttributes.TOPIC_NAME_ATTRIBUTE, projectTopicName);
                    flowFile = processSession.putAllAttributes(flowFile, hashMap);
                    arrayList.add(flowFile);
                } catch (InterruptedException | ExecutionException e) {
                    if (e.getCause() instanceof DeadlineExceededException) {
                        getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {} but attempting again may succeed so routing to retry", new Object[]{projectTopicName, e.getLocalizedMessage()}, e);
                        processSession.transfer(flowFile, REL_RETRY);
                    } else {
                        getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {}", new Object[]{projectTopicName, e});
                        processSession.transfer(flowFile, REL_FAILURE);
                    }
                    processContext.yield();
                }
            }
        } finally {
            if (!arrayList.isEmpty()) {
                processSession.transfer(arrayList, REL_SUCCESS);
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    processSession.getProvenanceReporter().send((FlowFile) it.next(), projectTopicName, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime));
                }
            }
        }
    }

    @OnStopped
    public void onStopped() {
        shutdownPublisher();
    }

    private void shutdownPublisher() {
        try {
            if (this.publisher != null) {
                this.publisher.shutdown();
            }
        } catch (Exception e) {
            getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Publisher due to {}", new Object[]{e});
        }
    }

    private ProjectTopicName getTopicName(ProcessContext processContext) {
        String value = processContext.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
        return value.contains("/") ? ProjectTopicName.parse(value) : ProjectTopicName.of(processContext.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue(), value);
    }

    private Map<String, String> getDynamicAttributesMap(ProcessContext processContext, FlowFile flowFile) {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : processContext.getProperties().entrySet()) {
            if (((PropertyDescriptor) entry.getKey()).isDynamic()) {
                hashMap.put(((PropertyDescriptor) entry.getKey()).getName(), processContext.getProperty((PropertyDescriptor) entry.getKey()).evaluateAttributeExpressions(flowFile).getValue());
            }
        }
        return hashMap;
    }

    private Publisher.Builder getPublisherBuilder(ProcessContext processContext) {
        return Publisher.newBuilder(getTopicName(processContext)).setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(processContext))).setBatchingSettings(BatchingSettings.newBuilder().setElementCountThreshold(processContext.getProperty(BATCH_SIZE).asLong()).setIsEnabled(true).build());
    }
}
