/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.pulsar.pubsub;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
import org.apache.nifi.processors.pulsar.pubsub.ConsumePulsar;
import org.apache.nifi.processors.pulsar.pubsub.PublishPulsar;
import org.apache.nifi.processors.pulsar.pubsub.PublishPulsarRecord;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.schema.GenericRecord;

@CapabilityDescription(value="Consumes messages from Apache Pulsar. The complementary NiFi processor for sending messages is PublishPulsarRecord. Please note that, at this time, the Processor assumes that all records that are retrieved have the same schema. If any of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the 'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.")
@Tags(value={"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
@WritesAttributes(value={@WritesAttribute(attribute="record.count", description="The number of records received")})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@SeeAlso(value={PublishPulsar.class, ConsumePulsar.class, PublishPulsarRecord.class})
public class ConsumePulsarRecord
extends AbstractPulsarConsumerProcessor<GenericRecord> {
    public static final String MSG_COUNT = "record.count";
    private static final String RECORD_SEPARATOR = "\n";
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("Record Reader").displayName("Record Reader").description("The Record Reader to use for incoming FlowFiles").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").displayName("Record Writer").description("The Record Writer to use in order to serialize the data before sending to Pulsar").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder().name("Max Wait Time").description("The maximum amount of time allowed for a Pulsar consumer to poll a subscription for data , zero means there is no limit. Max time less than 1 second will be equal to zero.").defaultValue("2 seconds").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder().name("parse_failure").description("FlowFiles for which the content cannot be parsed.").build();
    private static final List<PropertyDescriptor> PROPERTIES;
    private static final Set<Relationship> RELATIONSHIPS;

    @Override
    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        int maxMessages = context.getProperty(CONSUMER_BATCH_SIZE).isSet() ? context.getProperty(CONSUMER_BATCH_SIZE).evaluateAttributeExpressions().asInteger() : Integer.MAX_VALUE;
        byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes() : RECORD_SEPARATOR.getBytes();
        try {
            Consumer<GenericRecord> consumer = this.getConsumer(context, this.getConsumerId(context, session.get()));
            if (consumer == null) {
                context.yield();
                return;
            }
            if (context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean().booleanValue()) {
                this.consumeAsync(consumer, context, session);
                this.handleAsync(context, session, consumer, readerFactory, writerFactory, demarcator);
            } else {
                this.consumeMessages(context, session, consumer, this.getMessages(consumer, maxMessages), readerFactory, writerFactory, demarcator, false);
            }
        }
        catch (PulsarClientException e) {
            this.getLogger().error("Unable to consume from Pulsar Topic ", (Throwable)e);
            context.yield();
            throw new ProcessException((Throwable)e);
        }
    }

    private List<Message<GenericRecord>> getMessages(Consumer<GenericRecord> consumer, int maxMessages) throws PulsarClientException {
        LinkedList<Message<GenericRecord>> messages = new LinkedList<Message<GenericRecord>>();
        Message msg = null;
        AtomicInteger msgCount = new AtomicInteger(0);
        while (msgCount.get() < maxMessages && (msg = consumer.receive(0, TimeUnit.SECONDS)) != null) {
            messages.add((Message<GenericRecord>)msg);
            msgCount.incrementAndGet();
        }
        return messages;
    }

    private void consumeMessages(ProcessContext context, ProcessSession session, Consumer<GenericRecord> consumer, List<Message<GenericRecord>> messages, RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, byte[] demarcator, boolean async) throws PulsarClientException {
        if (CollectionUtils.isEmpty(messages)) {
            return;
        }
        LinkedBlockingQueue<Message<GenericRecord>> parseFailures = new LinkedBlockingQueue<Message<GenericRecord>>();
        RecordSchema schema = null;
        FlowFile flowFile = null;
        OutputStream rawOut = null;
        RecordSetWriter writer = null;
        Map<String, String> lastAttributes = null;
        Message<GenericRecord> lastMessage = null;
        Map<String, String> currentAttributes = null;
        boolean shared = this.isSharedSubscription(context);
        try {
            for (Message<GenericRecord> msg : messages) {
                currentAttributes = this.getMappedFlowFileAttributes(context, msg);
                if (lastAttributes != null && !lastAttributes.equals(currentAttributes)) {
                    WriteResult result = writer.finishRecordSet();
                    IOUtils.closeQuietly(writer);
                    IOUtils.closeQuietly(rawOut);
                    if (result != WriteResult.EMPTY) {
                        flowFile = session.putAllAttributes(flowFile, result.getAttributes());
                        flowFile = session.putAttribute(flowFile, MSG_COUNT, "" + result.getRecordCount());
                        session.getProvenanceReporter().receive(flowFile, this.getPulsarClientService().getPulsarBrokerRootURL() + "/" + consumer.getTopic());
                        session.transfer(flowFile, REL_SUCCESS);
                    } else {
                        session.rollback();
                    }
                    this.handleFailures(session, parseFailures, demarcator);
                    parseFailures.clear();
                    if (!shared) {
                        this.acknowledgeCumulative(consumer, lastMessage, async);
                    }
                    lastAttributes = null;
                    lastMessage = null;
                }
                byte[] data = msg.getData();
                if (lastMessage == null) {
                    flowFile = session.create();
                    flowFile = session.putAllAttributes(flowFile, currentAttributes);
                    schema = this.getSchema(flowFile, readerFactory, data);
                    rawOut = session.write(flowFile);
                    writer = this.getRecordWriter(writerFactory, schema, rawOut, flowFile);
                    if (schema == null || writer == null) {
                        parseFailures.add(msg);
                        session.remove(flowFile);
                        IOUtils.closeQuietly((OutputStream)rawOut);
                        this.getLogger().error("Unable to create a record writer to consume from the Pulsar topic");
                        continue;
                    }
                    writer.beginRecordSet();
                }
                lastAttributes = currentAttributes;
                lastMessage = msg;
                if (shared) {
                    this.acknowledge(consumer, msg, async);
                }
                ByteArrayInputStream in = new ByteArrayInputStream(data);
                try {
                    RecordReader r = readerFactory.createRecordReader(flowFile, (InputStream)in, this.getLogger());
                    Record record = r.nextRecord();
                    while (record != null) {
                        writer.write(record);
                        record = r.nextRecord();
                    }
                }
                catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
                    parseFailures.add(msg);
                }
            }
            WriteResult result = writer.finishRecordSet();
            IOUtils.closeQuietly(writer);
            IOUtils.closeQuietly(rawOut);
            if (result != WriteResult.EMPTY) {
                flowFile = session.putAllAttributes(flowFile, result.getAttributes());
                flowFile = session.putAttribute(flowFile, MSG_COUNT, "" + result.getRecordCount());
                session.getProvenanceReporter().receive(flowFile, this.getPulsarClientService().getPulsarBrokerRootURL() + "/" + consumer.getTopic());
                session.transfer(flowFile, REL_SUCCESS);
            } else {
                session.rollback();
            }
        }
        catch (IOException e) {
            this.getLogger().error("Unable to consume from Pulsar topic ", (Throwable)e);
        }
        this.handleFailures(session, parseFailures, demarcator);
        if (!shared) {
            this.acknowledgeCumulative(consumer, messages.get(messages.size() - 1), async);
        }
    }

    private void acknowledge(final Consumer<GenericRecord> consumer, final Message<GenericRecord> msg, boolean async) throws PulsarClientException {
        if (async) {
            this.getAckService().submit(new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    return consumer.acknowledgeAsync(msg).get();
                }
            });
        } else {
            consumer.acknowledge(msg);
        }
    }

    private void acknowledgeCumulative(final Consumer<GenericRecord> consumer, final Message<GenericRecord> msg, boolean async) throws PulsarClientException {
        if (async) {
            this.getAckService().submit(new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    return consumer.acknowledgeCumulativeAsync(msg).get();
                }
            });
        } else {
            consumer.acknowledgeCumulative(msg);
        }
    }

    private void handleFailures(ProcessSession session, BlockingQueue<Message<GenericRecord>> parseFailures, byte[] demarcator) {
        if (CollectionUtils.isEmpty(parseFailures)) {
            return;
        }
        FlowFile flowFile = session.create();
        OutputStream rawOut = session.write(flowFile);
        try {
            Iterator failureIterator = parseFailures.iterator();
            int idx = 0;
            while (failureIterator.hasNext()) {
                Message msg = (Message)failureIterator.next();
                if (msg != null && msg.getData() != null) {
                    if (idx > 0) {
                        rawOut.write(demarcator);
                    }
                    rawOut.write(msg.getData());
                }
                ++idx;
            }
            IOUtils.closeQuietly((OutputStream)rawOut);
            session.transfer(flowFile, REL_PARSE_FAILURE);
        }
        catch (IOException e) {
            this.getLogger().error("Unable to route failures", (Throwable)e);
        }
    }

    protected void handleAsync(ProcessContext context, ProcessSession session, Consumer<GenericRecord> consumer, RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, byte[] demarcator) throws PulsarClientException {
        Integer queryTimeout = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
        try {
            Future<List<Message<GenericRecord>>> done = null;
            do {
                List<Message<GenericRecord>> messages;
                if ((done = this.getConsumerService().poll(queryTimeout.intValue(), TimeUnit.SECONDS)) == null || !CollectionUtils.isNotEmpty(messages = done.get())) continue;
                this.consumeMessages(context, session, consumer, messages, readerFactory, writerFactory, demarcator, true);
            } while (done != null);
        }
        catch (InterruptedException | ExecutionException e) {
            this.getLogger().error("Trouble consuming messages ", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RecordSchema getSchema(FlowFile flowFile, RecordReaderFactory readerFactory, byte[] msgValue) {
        RecordSchema schema = null;
        ByteArrayInputStream in = null;
        try {
            in = new ByteArrayInputStream(msgValue);
            schema = readerFactory.createRecordReader(flowFile, (InputStream)in, this.getLogger()).getSchema();
        }
        catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
            RecordSchema recordSchema;
            try {
                this.getLogger().error("Unable to determine the schema", e);
                recordSchema = null;
            }
            catch (Throwable throwable) {
                IOUtils.closeQuietly(in);
                throw throwable;
            }
            IOUtils.closeQuietly((InputStream)in);
            return recordSchema;
        }
        IOUtils.closeQuietly((InputStream)in);
        return schema;
    }

    private RecordSetWriter getRecordWriter(RecordSetWriterFactory writerFactory, RecordSchema srcSchema, OutputStream out, FlowFile flowFile) {
        try {
            RecordSchema writeSchema = writerFactory.getSchema(Collections.emptyMap(), srcSchema);
            return writerFactory.createWriter(this.getLogger(), writeSchema, out, flowFile);
        }
        catch (IOException | SchemaNotFoundException e) {
            return null;
        }
    }

    static {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(RECORD_READER);
        properties.add(RECORD_WRITER);
        properties.add(MAX_WAIT_TIME);
        properties.addAll(AbstractPulsarConsumerProcessor.PROPERTIES);
        PROPERTIES = Collections.unmodifiableList(properties);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_PARSE_FAILURE);
        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    }
}

