package org.apache.nifi.processors.druid;

import com.metamx.tranquility.tranquilizer.MessageDroppedException;
import com.metamx.tranquility.tranquilizer.Tranquilizer;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.api.druid.DruidTranquilityService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;

@CapabilityDescription("Sends records to Druid for Indexing. Leverages Druid Tranquility Controller service.")
@WritesAttribute(attribute = PutDruidRecord.RECORD_COUNT, description = "The number of messages that were sent to Druid for this FlowFile. FlowFiles on the success relationship will have a value of this attribute that indicates the number of records successfully processed by Druid, and the FlowFile content will be only the successful records. This behavior applies to the failure and dropped relationships as well.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"druid", "timeseries", "olap", "ingest", "put", "record"})
/* loaded from: input_file:org/apache/nifi/processors/druid/PutDruidRecord.class */
public class PutDruidRecord extends AbstractSessionFactoryProcessor {
    static final String RECORD_COUNT = "record.count";
    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;
    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder().name("putdruid-record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder().name("putdruid-record-writer").displayName("Record Writer").description("The Record Writer to use in order to serialize the data to outgoing relationships.").identifiesControllerService(RecordSetWriterFactory.class).expressionLanguageSupported(false).required(true).build();
    static final PropertyDescriptor DRUID_TRANQUILITY_SERVICE = new PropertyDescriptor.Builder().name("putdruid-tranquility-service").displayName("Tranquility Service").description("Tranquility Service to use for sending events to Druid.").required(true).identifiesControllerService(DruidTranquilityService.class).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles are routed to this relationship when they are successfully processed by Druid").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles are routed to this relationship when they cannot be parsed or otherwise processed by Druid").build();
    static final Relationship REL_DROPPED = new Relationship.Builder().name("dropped").description("FlowFiles are routed to this relationship when they are outside of the configured time window, timestamp format is invalid, ect...").build();

    public void init(ProcessorInitializationContext processorInitializationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(RECORD_READER_FACTORY);
        arrayList.add(RECORD_WRITER_FACTORY);
        arrayList.add(DRUID_TRANQUILITY_SERVICE);
        this.properties = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_DROPPED);
        hashSet.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    private void processFlowFile(ProcessContext processContext, ProcessSession processSession) {
        final ComponentLog logger = getLogger();
        DruidTranquilityService asControllerService = processContext.getProperty(DRUID_TRANQUILITY_SERVICE).asControllerService(DruidTranquilityService.class);
        Tranquilizer tranquilizer = asControllerService.getTranquilizer();
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        FlowFile create = processSession.create(flowFile);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        FlowFile create2 = processSession.create(flowFile);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        FlowFile create3 = processSession.create(flowFile);
        final AtomicInteger atomicInteger3 = new AtomicInteger(0);
        final AtomicInteger atomicInteger4 = new AtomicInteger(0);
        int i = 0;
        OutputStream write = processSession.write(create);
        OutputStream write2 = processSession.write(create2);
        OutputStream write3 = processSession.write(create3);
        try {
            InputStream read = processSession.read(flowFile);
            Throwable th = null;
            try {
                try {
                    RecordReaderFactory asControllerService2 = processContext.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
                    RecordSetWriterFactory asControllerService3 = processContext.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
                    Map attributes = flowFile.getAttributes();
                    RecordReader createRecordReader = asControllerService2.createRecordReader(flowFile, read, getLogger());
                    RecordSchema schema = asControllerService3.getSchema(attributes, createRecordReader.getSchema());
                    final RecordSetWriter createWriter = asControllerService3.createWriter(logger, schema, write);
                    createWriter.beginRecordSet();
                    final RecordSetWriter createWriter2 = asControllerService3.createWriter(logger, schema, write2);
                    createWriter2.beginRecordSet();
                    final RecordSetWriter createWriter3 = asControllerService3.createWriter(logger, schema, write3);
                    createWriter3.beginRecordSet();
                    while (true) {
                        final Record nextRecord = createRecordReader.nextRecord();
                        if (nextRecord == null) {
                            break;
                        }
                        i++;
                        Map map = (Map) DataTypeUtils.convertRecordFieldtoObject(nextRecord, RecordFieldType.RECORD.getRecordDataType(nextRecord.getSchema()));
                        logger.debug("Tranquilizer Status: {}", new Object[]{tranquilizer.status().toString()});
                        Future send = tranquilizer.send(map);
                        logger.debug("Sent Payload to Druid: {}", new Object[]{map});
                        send.addEventListener(new FutureEventListener<Object>() { // from class: org.apache.nifi.processors.druid.PutDruidRecord.1
                            public void onFailure(Throwable th2) {
                                if (th2 instanceof MessageDroppedException) {
                                    logger.debug("Record Dropped due to MessageDroppedException: {}, transferring record to dropped.", new Object[]{th2.getMessage()}, th2);
                                    try {
                                        synchronized (createWriter) {
                                            createWriter.write(nextRecord);
                                            createWriter.flush();
                                            atomicInteger.incrementAndGet();
                                        }
                                        return;
                                    } catch (IOException e) {
                                        logger.error("Error transferring record to dropped, this may result in data loss.", new Object[]{e.getMessage()}, e);
                                        atomicInteger4.incrementAndGet();
                                        return;
                                    }
                                }
                                logger.error("FlowFile Processing Failed due to: {}", new Object[]{th2.getMessage()}, th2);
                                try {
                                    synchronized (createWriter2) {
                                        createWriter2.write(nextRecord);
                                        createWriter2.flush();
                                        atomicInteger2.incrementAndGet();
                                    }
                                } catch (IOException e2) {
                                    logger.error("Error transferring record to failure, this may result in data loss.", new Object[]{e2.getMessage()}, e2);
                                    atomicInteger4.incrementAndGet();
                                }
                            }

                            public void onSuccess(Object obj) {
                                logger.debug(" FlowFile Processing Success: {}", new Object[]{obj.toString()});
                                try {
                                    synchronized (createWriter3) {
                                        createWriter3.write(nextRecord);
                                        createWriter3.flush();
                                        atomicInteger3.incrementAndGet();
                                    }
                                } catch (IOException e) {
                                    logger.error("Error transferring record to success, this may result in data loss. However the record was successfully processed by Druid", new Object[]{e.getMessage()}, e);
                                    atomicInteger4.incrementAndGet();
                                }
                            }
                        });
                    }
                    if (read != null) {
                        if (0 != 0) {
                            try {
                                read.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            read.close();
                        }
                    }
                    if (i == 0) {
                        processSession.transfer(processSession.putAttribute(flowFile, RECORD_COUNT, "0"), REL_SUCCESS);
                        try {
                            write.close();
                            processSession.remove(create);
                        } catch (IOException e) {
                            logger.error("Error closing output stream for FlowFile with dropped records.", e);
                        }
                        try {
                            write2.close();
                            processSession.remove(create2);
                        } catch (IOException e2) {
                            logger.error("Error closing output stream for FlowFile with failed records.", e2);
                        }
                        try {
                            write3.close();
                            processSession.remove(create3);
                        } catch (IOException e3) {
                            logger.error("Error closing output stream for FlowFile with successful records.", e3);
                        }
                    } else {
                        while (i != atomicInteger.get() + atomicInteger2.get() + atomicInteger3.get() + atomicInteger4.get()) {
                            Thread.yield();
                        }
                        try {
                            createWriter.finishRecordSet();
                            createWriter.close();
                            if (atomicInteger.get() > 0) {
                                processSession.transfer(processSession.putAttribute(create, RECORD_COUNT, Integer.toString(atomicInteger.get())), REL_DROPPED);
                            } else {
                                processSession.remove(create);
                            }
                            try {
                                createWriter2.finishRecordSet();
                                createWriter2.close();
                                if (atomicInteger2.get() > 0) {
                                    processSession.transfer(processSession.putAttribute(create2, RECORD_COUNT, Integer.toString(atomicInteger2.get())), REL_FAILURE);
                                } else {
                                    processSession.remove(create2);
                                }
                                try {
                                    createWriter3.finishRecordSet();
                                    createWriter3.close();
                                    if (atomicInteger3.get() > 0) {
                                        FlowFile putAttribute = processSession.putAttribute(create3, RECORD_COUNT, Integer.toString(atomicInteger3.get()));
                                        processSession.transfer(putAttribute, REL_SUCCESS);
                                        processSession.getProvenanceReporter().send(putAttribute, asControllerService.getTransitUri());
                                    } else {
                                        processSession.remove(create3);
                                    }
                                    processSession.remove(flowFile);
                                } catch (IOException e4) {
                                    logger.error("Error closing FlowFile with successful records: {}", new Object[]{e4.getMessage()}, e4);
                                    processSession.rollback();
                                    throw new ProcessException(e4);
                                }
                            } catch (IOException e5) {
                                logger.error("Error closing FlowFile with failed records: {}", new Object[]{e5.getMessage()}, e5);
                                processSession.rollback();
                                throw new ProcessException(e5);
                            }
                        } catch (IOException e6) {
                            logger.error("Error closing FlowFile with dropped records: {}", new Object[]{e6.getMessage()}, e6);
                            processSession.rollback();
                            throw new ProcessException(e6);
                        }
                    }
                    processSession.commit();
                } finally {
                }
            } finally {
            }
        } catch (IOException | SchemaNotFoundException | MalformedRecordException e7) {
            logger.error("FlowFile Processing Failed due to: {}", new Object[]{e7.getMessage()}, e7);
            processSession.transfer(processSession.putAttribute(flowFile, RECORD_COUNT, Integer.toString(i)), REL_FAILURE);
            try {
                write.close();
                processSession.remove(create);
            } catch (IOException e8) {
                logger.error("Error closing output stream for FlowFile with dropped records.", e8);
            }
            try {
                write2.close();
                processSession.remove(create2);
            } catch (IOException e9) {
                logger.error("Error closing output stream for FlowFile with failed records.", e9);
            }
            try {
                write3.close();
                processSession.remove(create3);
            } catch (IOException e10) {
                logger.error("Error closing output stream for FlowFile with successful records.", e10);
            }
            processSession.commit();
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException {
        processFlowFile(processContext, processSessionFactory.createSession());
    }
}
