/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.pulsar.pubsub;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
import org.apache.nifi.processors.pulsar.MessageTuple;
import org.apache.nifi.processors.pulsar.pubsub.ConsumePulsar;
import org.apache.nifi.processors.pulsar.pubsub.ConsumePulsarRecord;
import org.apache.nifi.processors.pulsar.pubsub.PublishPulsarRecord;
import org.apache.nifi.stream.io.util.StreamDemarcator;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;

@SeeAlso(value={ConsumePulsar.class, ConsumePulsarRecord.class, PublishPulsarRecord.class})
@Tags(value={"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
@CapabilityDescription(value="Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar Producer API.The messages to send may be individual FlowFiles or may be delimited, using a user-specified delimiter, such as a new-line. The complementary NiFi processor for fetching messages is ConsumePulsar.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttribute(attribute="msg.count", description="The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to This attribute is added only to FlowFiles that are routed to success.")
@TriggerWhenEmpty
public class PublishPulsar
extends AbstractPulsarProducerProcessor<byte[]> {
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        byte[] demarcatorBytes;
        this.handleFailures(session);
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
        Producer producer = this.getProducer(context, topic);
        if (producer == null) {
            this.getLogger().error("Unable to publish to topic {}", new Object[]{topic});
            session.transfer(flowFile, REL_FAILURE);
            if (context.getProperty(ASYNC_ENABLED).asBoolean().booleanValue()) {
                context.yield();
            }
            return;
        }
        byte[] byArray = demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null;
        if (!context.getProperty(ASYNC_ENABLED).asBoolean().booleanValue()) {
            try {
                this.send(producer, context, session, flowFile, demarcatorBytes);
            }
            catch (PulsarClientException e) {
                this.getLogger().error("Failed to connect to Pulsar Server due to {}", new Object[]{e});
                session.transfer(flowFile, REL_FAILURE);
            }
        } else if (this.canPublish.get()) {
            try (InputStream in = session.read(flowFile);
                 StreamDemarcator demarcator = new StreamDemarcator(in, demarcatorBytes, Integer.MAX_VALUE);){
                byte[] messageContent;
                while ((messageContent = demarcator.nextToken()) != null) {
                    this.workQueue.put(new MessageTuple<byte[]>(topic, this.getMessageKey(context, flowFile), this.getMappedMessageProperties(context, flowFile), messageContent));
                }
                demarcator.close();
                session.transfer(flowFile, REL_SUCCESS);
            }
            catch (Throwable t) {
                this.getLogger().error("Unable to process session due to ", t);
                session.transfer(flowFile, REL_FAILURE);
            }
        }
    }

    private void send(Producer<byte[]> producer, ProcessContext context, ProcessSession session, FlowFile flowFile, byte[] demarcatorBytes) throws PulsarClientException {
        AtomicInteger successCounter = new AtomicInteger(0);
        AtomicInteger failureCounter = new AtomicInteger(0);
        String key = this.getMessageKey(context, flowFile);
        Map<String, String> properties = this.getMappedMessageProperties(context, flowFile);
        try (InputStream in = session.read(flowFile);
             StreamDemarcator demarcator = new StreamDemarcator(in, demarcatorBytes, Integer.MAX_VALUE);){
            byte[] messageContent;
            while ((messageContent = demarcator.nextToken()) != null) {
                if (this.send(producer, key, properties, messageContent) != null) {
                    successCounter.incrementAndGet();
                    continue;
                }
                failureCounter.incrementAndGet();
                break;
            }
        }
        catch (IOException ioEx) {
            this.getLogger().error("Unable to publish message to Pulsar broker " + this.getPulsarClientService().getPulsarBrokerRootURL(), (Throwable)ioEx);
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        if (successCounter.intValue() > 0) {
            session.adjustCounter("Messages Sent", (long)successCounter.get(), true);
            session.getProvenanceReporter().send(flowFile, this.getPulsarClientService().getPulsarBrokerRootURL() + "/" + producer.getTopic(), "Sent " + successCounter.get() + " messages");
        }
        if (failureCounter.intValue() == 0) {
            session.transfer(flowFile, REL_SUCCESS);
        } else {
            session.transfer(flowFile, REL_FAILURE);
        }
    }
}

