package org.apache.nifi.processors.gcp.pubsub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.iam.v1.TestIamPermissionsRequest;
import com.google.iam.v1.TestIamPermissionsResponse;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.pubsub.publish.FlowFileResult;
import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch;
import org.threeten.bp.Duration;

@CapabilityDescription("Publishes the content of the incoming flowfile to the configured Google Cloud PubSub topic. The processor supports dynamic properties. If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'.")
@DynamicProperty(name = "Attribute name", value = "Value to be set to the attribute", description = "Attributes to be set for the outgoing Google Cloud PubSub message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content will be read into memory to be sent as a PubSub message.")
@WritesAttributes({@WritesAttribute(attribute = PubSubAttributes.MESSAGE_ID_ATTRIBUTE, description = PubSubAttributes.MESSAGE_ID_DESCRIPTION), @WritesAttribute(attribute = PubSubAttributes.RECORDS_ATTRIBUTE, description = PubSubAttributes.RECORDS_DESCRIPTION), @WritesAttribute(attribute = PubSubAttributes.TOPIC_NAME_ATTRIBUTE, description = PubSubAttributes.TOPIC_NAME_DESCRIPTION)})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SeeAlso({ConsumeGCPubSub.class})
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish"})
/* loaded from: input_file:org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.class */
public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
    private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
    protected Publisher publisher = null;
    private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.topics.publish");
    public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder().name("Input Batch Size").displayName("Input Batch Size").description("Maximum number of FlowFiles processed for each Processor invocation").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.NUMBER_VALIDATOR).defaultValue("100").build();
    public static final PropertyDescriptor MESSAGE_DERIVATION_STRATEGY = new PropertyDescriptor.Builder().name("Message Derivation Strategy").displayName("Message Derivation Strategy").description("The strategy used to publish the incoming FlowFile to the Google Cloud PubSub endpoint.").required(true).defaultValue(MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue()).allowableValues(MessageDerivationStrategy.class).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("Record Reader").displayName("Record Reader").description("The Record Reader to use for incoming FlowFiles").identifiesControllerService(RecordReaderFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).dependsOn(MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue(), new String[0]).required(true).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").displayName("Record Writer").description("The Record Writer to use in order to serialize the data before sending to GCPubSub endpoint").identifiesControllerService(RecordSetWriterFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).dependsOn(MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue(), new String[0]).required(true).build();
    public static final PropertyDescriptor MAX_MESSAGE_SIZE = new PropertyDescriptor.Builder().name("Maximum Message Size").displayName("Maximum Message Size").description("The maximum size of a Google PubSub message in bytes. Defaults to 1 MB (1048576 bytes)").dependsOn(MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue(), new String[0]).required(true).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").build();
    public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder().name("gcp-pubsub-topic").displayName("Topic Name").description("Name of the Google Cloud PubSub Topic").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("FlowFiles are routed to this relationship if the Google Cloud Pub/Sub operation fails but attempting the operation again may succeed.").build();
    private static final List<PropertyDescriptor> DESCRIPTORS = List.of((Object[]) new PropertyDescriptor[]{GCP_CREDENTIALS_PROVIDER_SERVICE, PROJECT_ID, TOPIC_NAME, MESSAGE_DERIVATION_STRATEGY, RECORD_READER, RECORD_WRITER, MAX_BATCH_SIZE, MAX_MESSAGE_SIZE, BATCH_SIZE_THRESHOLD, BATCH_BYTES_THRESHOLD, BATCH_DELAY_THRESHOLD, API_ENDPOINT, PROXY_CONFIGURATION_SERVICE});
    public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE, REL_RETRY);

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().required(false).name(str).displayName(str).addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR).dynamic(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    }

    @Override // org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor
    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @Override // org.apache.nifi.processors.gcp.AbstractGCPProcessor
    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        try {
            this.publisher = getPublisherBuilder(processContext).build();
        } catch (IOException e) {
            throw new ProcessException("Failed to create Google Cloud PubSub Publisher", e);
        }
    }

    public List<ConfigVerificationResult> verify(ProcessContext processContext, ComponentLog componentLog, Map<String, String> map) {
        ArrayList arrayList = new ArrayList();
        Publisher publisher = null;
        try {
            publisher = getPublisherBuilder(processContext).build();
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Create Publisher").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully created Publisher").build());
        } catch (IOException e) {
            componentLog.error("Failed to create Publisher", e);
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Create Publisher").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Failed to create Publisher: " + e.getMessage(), new Object[0])).build());
        }
        if (publisher != null) {
            try {
                GrpcPublisherStub create = GrpcPublisherStub.create(PublisherStubSettings.newBuilder().setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(processContext))).setTransportChannelProvider(getTransportChannelProvider(processContext)).build());
                try {
                    String value = processContext.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
                    if (((TestIamPermissionsResponse) create.testIamPermissionsCallable().call(TestIamPermissionsRequest.newBuilder().addAllPermissions(REQUIRED_PERMISSIONS).setResource(value).build())).getPermissionsCount() >= REQUIRED_PERMISSIONS.size()) {
                        arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(String.format("Verified Topic [%s] exists and the configured user has the correct permissions.", value)).build());
                    } else {
                        arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("The configured user does not have the correct permissions on Topic [%s].", value)).build());
                    }
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th) {
                    if (create != null) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (IOException e2) {
                componentLog.error("The publisher stub could not be created in order to test the permissions", e2);
                arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("The publisher stub could not be created in order to test the permissions: " + e2.getMessage(), new Object[0])).build());
            } catch (ApiException e3) {
                componentLog.error("The configured user appears to have the correct permissions, but the following error was encountered", e3);
                arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("The configured user appears to have the correct permissions, but the following error was encountered: " + e3.getMessage(), new Object[0])).build());
            }
        }
        return arrayList;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        StopWatch stopWatch = new StopWatch(true);
        MessageDerivationStrategy valueOf = MessageDerivationStrategy.valueOf(processContext.getProperty(MESSAGE_DERIVATION_STRATEGY).getValue());
        List<FlowFile> list = processSession.get(processContext.getProperty(MAX_BATCH_SIZE).asInteger().intValue());
        if (list.isEmpty()) {
            processContext.yield();
        } else if (MessageDerivationStrategy.FLOWFILE_ORIENTED.equals(valueOf)) {
            onTriggerFlowFileStrategy(processContext, processSession, stopWatch, list);
        } else {
            if (!MessageDerivationStrategy.RECORD_ORIENTED.equals(valueOf)) {
                throw new IllegalStateException(valueOf.getValue());
            }
            onTriggerRecordStrategy(processContext, processSession, stopWatch, list);
        }
    }

    private void onTriggerFlowFileStrategy(ProcessContext processContext, ProcessSession processSession, StopWatch stopWatch, List<FlowFile> list) throws ProcessException {
        long longValue = processContext.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
        Executor directExecutor = MoreExecutors.directExecutor();
        ArrayList arrayList = new ArrayList();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        for (FlowFile flowFile : list) {
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            ArrayList arrayList4 = new ArrayList();
            if (flowFile.getSize() > longValue) {
                arrayList4.add(new IllegalArgumentException(String.format("FlowFile size %d exceeds MAX_MESSAGE_SIZE", Long.valueOf(flowFile.getSize()))));
                arrayList.add(new FlowFileResult(flowFile, arrayList2, arrayList3, arrayList4));
            } else {
                byteArrayOutputStream.reset();
                processSession.exportTo(flowFile, byteArrayOutputStream);
                ApiFuture<String> publishOneMessage = publishOneMessage(processContext, flowFile, byteArrayOutputStream.toByteArray());
                arrayList2.add(publishOneMessage);
                addCallback(publishOneMessage, new TrackedApiFutureCallback(arrayList3, arrayList4), directExecutor);
                arrayList.add(new FlowFileResult(flowFile, arrayList2, arrayList3, arrayList4));
            }
        }
        finishBatch(processSession, stopWatch, arrayList);
    }

    private void onTriggerRecordStrategy(ProcessContext processContext, ProcessSession processSession, StopWatch stopWatch, List<FlowFile> list) throws ProcessException {
        try {
            onTriggerRecordStrategyPublishRecords(processContext, processSession, stopWatch, list);
        } catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
            throw new ProcessException("Record publishing failed", e);
        }
    }

    private void onTriggerRecordStrategyPublishRecords(ProcessContext processContext, ProcessSession processSession, StopWatch stopWatch, List<FlowFile> list) throws ProcessException, IOException, SchemaNotFoundException, MalformedRecordException {
        RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory asControllerService2 = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        Executor directExecutor = MoreExecutors.directExecutor();
        ArrayList arrayList = new ArrayList();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        for (FlowFile flowFile : list) {
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            ArrayList arrayList4 = new ArrayList();
            Map attributes = flowFile.getAttributes();
            RecordReader createRecordReader = asControllerService.createRecordReader(attributes, processSession.read(flowFile), flowFile.getSize(), getLogger());
            try {
                RecordSet createRecordSet = createRecordReader.createRecordSet();
                RecordSetWriter createWriter = asControllerService2.createWriter(getLogger(), asControllerService2.getSchema(attributes, createRecordSet.getSchema()), byteArrayOutputStream, attributes);
                PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(createRecordSet);
                while (pushBackRecordSet.isAnotherRecord()) {
                    ApiFuture<String> publishOneRecord = publishOneRecord(processContext, flowFile, byteArrayOutputStream, createWriter, pushBackRecordSet.next());
                    arrayList2.add(publishOneRecord);
                    addCallback(publishOneRecord, new TrackedApiFutureCallback(arrayList3, arrayList4), directExecutor);
                }
                arrayList.add(new FlowFileResult(flowFile, arrayList2, arrayList3, arrayList4));
                if (createRecordReader != null) {
                    createRecordReader.close();
                }
            } catch (Throwable th) {
                if (createRecordReader != null) {
                    try {
                        createRecordReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        finishBatch(processSession, stopWatch, arrayList);
    }

    private ApiFuture<String> publishOneRecord(ProcessContext processContext, FlowFile flowFile, ByteArrayOutputStream byteArrayOutputStream, RecordSetWriter recordSetWriter, Record record) throws IOException {
        byteArrayOutputStream.reset();
        recordSetWriter.write(record);
        recordSetWriter.flush();
        return publishOneMessage(processContext, flowFile, byteArrayOutputStream.toByteArray());
    }

    private ApiFuture<String> publishOneMessage(ProcessContext processContext, FlowFile flowFile, byte[] bArr) {
        return this.publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFrom(bArr)).setPublishTime(Timestamp.newBuilder().build()).putAllAttributes(getDynamicAttributesMap(processContext, flowFile)).build());
    }

    private void finishBatch(ProcessSession processSession, StopWatch stopWatch, List<FlowFileResult> list) {
        String topicNameString = this.publisher.getTopicNameString();
        for (FlowFileResult flowFileResult : list) {
            Relationship reconcile = flowFileResult.reconcile();
            Map<String, String> attributes = flowFileResult.getAttributes();
            attributes.put(PubSubAttributes.TOPIC_NAME_ATTRIBUTE, topicNameString);
            FlowFile putAllAttributes = processSession.putAllAttributes(flowFileResult.getFlowFile(), attributes);
            processSession.getProvenanceReporter().send(putAllAttributes, String.format(TRANSIT_URI_FORMAT_STRING, topicNameString), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            processSession.transfer(putAllAttributes, reconcile);
        }
    }

    protected void addCallback(ApiFuture<String> apiFuture, ApiFutureCallback<? super String> apiFutureCallback, Executor executor) {
        ApiFutures.addCallback(apiFuture, apiFutureCallback, executor);
    }

    @OnStopped
    public void onStopped() {
        shutdownPublisher();
    }

    private void shutdownPublisher() {
        try {
            if (this.publisher != null) {
                this.publisher.shutdown();
            }
        } catch (Exception e) {
            getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Publisher due to {}", new Object[]{e});
        }
    }

    private ProjectTopicName getTopicName(ProcessContext processContext) {
        String value = processContext.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
        return value.contains("/") ? ProjectTopicName.parse(value) : ProjectTopicName.of(processContext.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue(), value);
    }

    private Map<String, String> getDynamicAttributesMap(ProcessContext processContext, FlowFile flowFile) {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : processContext.getProperties().entrySet()) {
            if (((PropertyDescriptor) entry.getKey()).isDynamic()) {
                hashMap.put(((PropertyDescriptor) entry.getKey()).getName(), processContext.getProperty((PropertyDescriptor) entry.getKey()).evaluateAttributeExpressions(flowFile).getValue());
            }
        }
        return hashMap;
    }

    private Publisher.Builder getPublisherBuilder(ProcessContext processContext) {
        Long asLong = processContext.getProperty(BATCH_SIZE_THRESHOLD).asLong();
        long longValue = processContext.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue();
        Long asTimePeriod = processContext.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);
        Publisher.Builder endpoint = Publisher.newBuilder(getTopicName(processContext)).setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(processContext))).setChannelProvider(getTransportChannelProvider(processContext)).setEndpoint(processContext.getProperty(API_ENDPOINT).getValue());
        endpoint.setBatchingSettings(BatchingSettings.newBuilder().setElementCountThreshold(asLong).setRequestByteThreshold(Long.valueOf(longValue)).setDelayThreshold(Duration.ofMillis(asTimePeriod.longValue())).setIsEnabled(true).build());
        return endpoint;
    }
}
