/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.pulsar.pubsub;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
import org.apache.nifi.processors.pulsar.pubsub.ConsumePulsarRecord;
import org.apache.nifi.processors.pulsar.pubsub.PublishPulsar;
import org.apache.nifi.processors.pulsar.pubsub.PublishPulsarRecord;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.schema.GenericRecord;

@SeeAlso(value={PublishPulsar.class, ConsumePulsarRecord.class, PublishPulsarRecord.class})
@Tags(value={"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
@CapabilityDescription(value="Consumes messages from Apache Pulsar. The complementary NiFi processor for sending messages is PublishPulsar.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@WritesAttributes(value={@WritesAttribute(attribute="message.count", description="The number of messages received from Pulsar")})
public class ConsumePulsar
extends AbstractPulsarConsumerProcessor<byte[]> {
    public static final String MSG_COUNT = "message.count";

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        try {
            Consumer<GenericRecord> consumer = this.getConsumer(context, this.getConsumerId(context, session.get()));
            if (consumer == null) {
                context.yield();
                return;
            }
            if (context.getProperty(ASYNC_ENABLED).asBoolean().booleanValue()) {
                this.consumeAsync(consumer, context, session);
                this.handleAsync(consumer, context, session);
            } else {
                this.consume(consumer, context, session);
            }
        }
        catch (PulsarClientException e) {
            this.getLogger().error("Unable to consume from Pulsar Topic ", (Throwable)e);
            context.yield();
            throw new ProcessException((Throwable)e);
        }
    }

    private void handleAsync(final Consumer<GenericRecord> consumer, ProcessContext context, ProcessSession session) {
        try {
            Future<List<Message<GenericRecord>>> done = this.getConsumerService().poll(5L, TimeUnit.SECONDS);
            if (done != null) {
                byte[] demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8) : null;
                boolean shared = this.isSharedSubscription(context);
                final List<Message<GenericRecord>> messages = done.get();
                if (CollectionUtils.isNotEmpty(messages)) {
                    FlowFile flowFile = null;
                    OutputStream out = null;
                    AtomicInteger msgCount = new AtomicInteger(0);
                    Map<String, String> lastAttributes = null;
                    Message<GenericRecord> lastMessage = null;
                    Map<String, String> currentAttributes = null;
                    for (final Message<GenericRecord> msg : messages) {
                        currentAttributes = this.getMappedFlowFileAttributes(context, msg);
                        if (lastAttributes != null && !lastAttributes.equals(currentAttributes)) {
                            IOUtils.closeQuietly(out);
                            flowFile = session.putAttribute(flowFile, MSG_COUNT, msgCount.toString());
                            session.getProvenanceReporter().receive(flowFile, this.getPulsarClientService().getPulsarBrokerRootURL() + "/" + consumer.getTopic());
                            session.transfer(flowFile, REL_SUCCESS);
                            session.commitAsync();
                            if (!shared) {
                                final Message<GenericRecord> finalMessage = lastMessage;
                                this.getAckService().submit(new Callable<Object>(){

                                    @Override
                                    public Object call() throws Exception {
                                        return consumer.acknowledgeCumulativeAsync(finalMessage).get();
                                    }
                                });
                            }
                            lastAttributes = null;
                            lastMessage = null;
                        }
                        if (lastAttributes == null) {
                            flowFile = session.create();
                            flowFile = session.putAllAttributes(flowFile, currentAttributes);
                            out = session.write(flowFile);
                            msgCount.set(0);
                        }
                        lastAttributes = currentAttributes;
                        lastMessage = msg;
                        if (shared) {
                            this.getAckService().submit(new Callable<Object>(){

                                @Override
                                public Object call() throws Exception {
                                    return consumer.acknowledgeAsync(msg).get();
                                }
                            });
                        }
                        try {
                            byte[] data;
                            if (msgCount.get() > 0) {
                                out.write(demarcatorBytes);
                            }
                            if ((data = msg.getData()) == null || data.length <= 0) continue;
                            out.write(data);
                            msgCount.getAndIncrement();
                        }
                        catch (IOException ioEx) {
                            session.rollback();
                            return;
                        }
                    }
                    IOUtils.closeQuietly(out);
                    flowFile = session.putAttribute(flowFile, MSG_COUNT, msgCount.toString());
                    session.getProvenanceReporter().receive(flowFile, this.getPulsarClientService().getPulsarBrokerRootURL() + "/" + consumer.getTopic());
                    session.transfer(flowFile, REL_SUCCESS);
                    session.commitAsync();
                }
                if (!shared) {
                    this.getAckService().submit(new Callable<Object>(){

                        @Override
                        public Object call() throws Exception {
                            return consumer.acknowledgeCumulativeAsync((Message)messages.get(messages.size() - 1)).get();
                        }
                    });
                }
            }
        }
        catch (InterruptedException | ExecutionException e) {
            this.getLogger().error("Trouble consuming messages ", (Throwable)e);
        }
    }

    private void consume(Consumer<GenericRecord> consumer, ProcessContext context, ProcessSession session) throws PulsarClientException {
        try {
            int maxMessages = context.getProperty(CONSUMER_BATCH_SIZE).isSet() ? context.getProperty(CONSUMER_BATCH_SIZE).evaluateAttributeExpressions().asInteger() : Integer.MAX_VALUE;
            byte[] demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8) : null;
            boolean shared = this.isSharedSubscription(context);
            FlowFile flowFile = null;
            OutputStream out = null;
            Message msg = null;
            Message lastMsg = null;
            AtomicInteger msgCount = new AtomicInteger(0);
            AtomicInteger loopCounter = new AtomicInteger(0);
            Map<String, String> lastAttributes = null;
            Map<String, String> currentAttributes = null;
            while (loopCounter.get() < maxMessages && (msg = consumer.receive(0, TimeUnit.SECONDS)) != null) {
                currentAttributes = this.getMappedFlowFileAttributes(context, (Message<GenericRecord>)msg);
                if (lastMsg != null && !lastAttributes.equals(currentAttributes)) {
                    IOUtils.closeQuietly(out);
                    if (!shared) {
                        consumer.acknowledgeCumulative(lastMsg);
                    }
                    if (msgCount.get() < 1) {
                        session.remove(flowFile);
                        session.commitAsync();
                    } else {
                        flowFile = session.putAttribute(flowFile, MSG_COUNT, msgCount.toString());
                        session.getProvenanceReporter().receive(flowFile, this.getPulsarClientService().getPulsarBrokerRootURL() + "/" + consumer.getTopic());
                        session.transfer(flowFile, REL_SUCCESS);
                        this.getLogger().debug("Created {} from {} messages received from Pulsar Server and transferred to 'success'", new Object[]{flowFile, msgCount.toString()});
                    }
                    lastAttributes = null;
                    lastMsg = null;
                }
                if (lastMsg == null) {
                    flowFile = session.create();
                    flowFile = session.putAllAttributes(flowFile, currentAttributes);
                    out = session.write(flowFile);
                    msgCount.set(0);
                }
                try {
                    byte[] data;
                    lastMsg = msg;
                    lastAttributes = currentAttributes;
                    loopCounter.incrementAndGet();
                    if (shared) {
                        consumer.acknowledge(msg);
                    }
                    if (msgCount.get() > 0) {
                        out.write(demarcatorBytes);
                    }
                    if ((data = msg.getData()) == null || data.length <= 0) continue;
                    out.write(data);
                    msgCount.getAndIncrement();
                }
                catch (IOException ioEx) {
                    this.getLogger().error("Unable to create flow file ", (Throwable)ioEx);
                    session.rollback();
                    if (!shared) {
                        consumer.acknowledgeCumulative(lastMsg);
                    }
                    return;
                }
            }
            IOUtils.closeQuietly(out);
            if (!shared && lastMsg != null) {
                consumer.acknowledgeCumulative(lastMsg);
            }
            if (msgCount.get() < 1) {
                if (flowFile != null) {
                    session.remove(flowFile);
                    session.commitAsync();
                }
            } else {
                flowFile = session.putAttribute(flowFile, MSG_COUNT, msgCount.toString());
                session.getProvenanceReporter().receive(flowFile, this.getPulsarClientService().getPulsarBrokerRootURL() + "/" + consumer.getTopic());
                session.transfer(flowFile, REL_SUCCESS);
                this.getLogger().debug("Created {} from {} messages received from Pulsar Server and transferred to 'success'", new Object[]{flowFile, msgCount.toString()});
            }
        }
        catch (PulsarClientException e) {
            this.getLogger().error("Error communicating with Apache Pulsar", (Throwable)e);
            context.yield();
            session.rollback();
        }
    }
}

