/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.pulsar.pubsub;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
import org.apache.nifi.processors.pulsar.MessageTuple;
import org.apache.nifi.processors.pulsar.pubsub.ConsumePulsar;
import org.apache.nifi.processors.pulsar.pubsub.ConsumePulsarRecord;
import org.apache.nifi.processors.pulsar.pubsub.PublishPulsar;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.pulsar.client.api.Producer;

@Tags(value={"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"})
@CapabilityDescription(value="Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. The complementary NiFi processor for fetching messages is ConsumePulsarRecord.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttribute(attribute="msg.count", description="The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to FlowFiles that are routed to success.")
@SeeAlso(value={PublishPulsar.class, ConsumePulsar.class, ConsumePulsarRecord.class})
@TriggerWhenEmpty
public class PublishPulsarRecord
extends AbstractPulsarProducerProcessor<byte[]> {
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("RECORD_READER").displayName("Record Reader").description("The Record Reader to use for incoming FlowFiles").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("RECORD_WRITER").displayName("Record Writer").description("The Record Writer to use in order to serialize the data before sending to Pulsar").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    private static final List<PropertyDescriptor> PROPERTIES;

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        this.handleFailures(session);
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
        Producer producer = this.getProducer(context, topic);
        if (producer == null) {
            this.getLogger().error("Unable to publish to topic {}", new Object[]{topic});
            session.transfer(flowFile, REL_FAILURE);
            if (context.getProperty(ASYNC_ENABLED).asBoolean().booleanValue()) {
                context.yield();
            }
            return;
        }
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        Map attributes = flowFile.getAttributes();
        AtomicLong messagesSent = new AtomicLong(0L);
        InputStream in = session.read(flowFile);
        try {
            RecordReader reader = readerFactory.createRecordReader(flowFile, in, this.getLogger());
            RecordSet recordSet = reader.createRecordSet();
            RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
            String key = this.getMessageKey(context, flowFile);
            Map<String, String> properties = this.getMappedMessageProperties(context, flowFile);
            boolean asyncFlag = context.getProperty(ASYNC_ENABLED).isSet() && context.getProperty(ASYNC_ENABLED).asBoolean() != false;
            try {
                messagesSent.addAndGet(this.send(producer, writerFactory, schema, reader, topic, key, properties, asyncFlag));
                IOUtils.closeQuietly((Closeable)in);
                session.putAttribute(flowFile, "msg.count", "" + messagesSent.get());
                session.putAttribute(flowFile, "topic.name", topic);
                session.adjustCounter("Messages Sent", messagesSent.get(), true);
                session.getProvenanceReporter().send(flowFile, this.getPulsarClientService().getPulsarBrokerRootURL(), "Sent " + messagesSent.get() + " records");
                session.transfer(flowFile, REL_SUCCESS);
            }
            catch (InterruptedException e) {
                session.transfer(flowFile, REL_FAILURE);
            }
        }
        catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
            IOUtils.closeQuietly((Closeable)in);
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int send(Producer<byte[]> producer, RecordSetWriterFactory writerFactory, RecordSchema schema, RecordReader reader, String topic, String key, Map<String, String> properties, boolean asyncFlag) throws IOException, SchemaNotFoundException, InterruptedException {
        RecordSet recordSet = reader.createRecordSet();
        ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
        int recordCount = 0;
        try {
            Record record;
            while ((record = recordSet.next()) != null) {
                ++recordCount;
                baos.reset();
                try (RecordSetWriter writer = writerFactory.createWriter(this.getLogger(), schema, (OutputStream)baos, Collections.emptyMap());){
                    writer.write(record);
                    writer.flush();
                }
                if (asyncFlag) {
                    this.workQueue.put(new MessageTuple<byte[]>(topic, key, properties, baos.toByteArray()));
                    continue;
                }
                this.send(producer, key, properties, baos.toByteArray());
            }
            int n = recordCount;
            return n;
        }
        finally {
            reader.close();
        }
    }

    static {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(RECORD_READER);
        properties.add(RECORD_WRITER);
        properties.addAll(AbstractPulsarProducerProcessor.PROPERTIES);
        PROPERTIES = Collections.unmodifiableList(properties);
    }
}

