package org.apache.nifi.processors.kite;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.util.StopWatch;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.IncompatibleSchemaException;
import org.kitesdk.data.ValidationException;
import org.kitesdk.data.View;
import org.kitesdk.data.spi.SchemaValidationUtil;

@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Stores Avro records in a Kite dataset")
@Tags({"kite", "avro", "parquet", "hadoop", "hive", "hdfs", "hbase"})
/* loaded from: input_file:org/apache/nifi/processors/kite/StoreInKiteDataset.class */
public class StoreInKiteDataset extends AbstractKiteProcessor {
    private static final Relationship SUCCESS = new Relationship.Builder().name("success").description("FlowFile content has been successfully saved").build();
    private static final Relationship INCOMPATIBLE = new Relationship.Builder().name("incompatible").description("FlowFile content is not compatible with the target dataset").build();
    private static final Relationship FAILURE = new Relationship.Builder().name("failure").description("FlowFile content could not be processed").build();
    public static final PropertyDescriptor KITE_DATASET_URI = new PropertyDescriptor.Builder().name("Target dataset URI").description("URI that identifies a Kite dataset where data will be stored").addValidator(RECOGNIZED_URI).expressionLanguageSupported(true).required(true).build();
    private static final List<PropertyDescriptor> PROPERTIES = ImmutableList.builder().addAll(AbstractKiteProcessor.getProperties()).add(KITE_DATASET_URI).build();
    private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.builder().add(SUCCESS).add(INCOMPATIBLE).add(FAILURE).build();

    @Override // org.apache.nifi.processors.kite.AbstractKiteProcessor
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onTrigger(ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        final View<GenericData.Record> load = load(processContext, flowFile);
        final Schema schema = load.getDataset().getDescriptor().getSchema();
        try {
            StopWatch stopWatch = new StopWatch(true);
            processSession.read(flowFile, new InputStreamCallback() { // from class: org.apache.nifi.processors.kite.StoreInKiteDataset.1
                public void process(InputStream inputStream) throws IOException {
                    DataFileStream dataFileStream = new DataFileStream(inputStream, AvroUtil.newDatumReader(schema, GenericData.Record.class));
                    Throwable th = null;
                    try {
                        IncompatibleSchemaException.check(SchemaValidationUtil.canRead(dataFileStream.getSchema(), schema), "Incompatible file schema %s, expected %s", new Object[]{dataFileStream.getSchema(), schema});
                        long j = 0;
                        try {
                            DatasetWriter newWriter = load.newWriter();
                            Throwable th2 = null;
                            try {
                                try {
                                    Iterator it = dataFileStream.iterator();
                                    while (it.hasNext()) {
                                        newWriter.write((GenericData.Record) it.next());
                                        j++;
                                    }
                                    if (newWriter != null) {
                                        if (0 != 0) {
                                            try {
                                                newWriter.close();
                                            } catch (Throwable th3) {
                                                th2.addSuppressed(th3);
                                            }
                                        } else {
                                            newWriter.close();
                                        }
                                    }
                                    processSession.adjustCounter("Stored records", j, true);
                                    if (dataFileStream != null) {
                                        if (0 == 0) {
                                            dataFileStream.close();
                                            return;
                                        }
                                        try {
                                            dataFileStream.close();
                                        } catch (Throwable th4) {
                                            th.addSuppressed(th4);
                                        }
                                    }
                                } catch (Throwable th5) {
                                    th2 = th5;
                                    throw th5;
                                }
                            } catch (Throwable th6) {
                                if (newWriter != null) {
                                    if (th2 != null) {
                                        try {
                                            newWriter.close();
                                        } catch (Throwable th7) {
                                            th2.addSuppressed(th7);
                                        }
                                    } else {
                                        newWriter.close();
                                    }
                                }
                                throw th6;
                            }
                        } catch (Throwable th8) {
                            processSession.adjustCounter("Stored records", 0L, true);
                            throw th8;
                        }
                    } catch (Throwable th9) {
                        if (dataFileStream != null) {
                            if (0 != 0) {
                                try {
                                    dataFileStream.close();
                                } catch (Throwable th10) {
                                    th.addSuppressed(th10);
                                }
                            } else {
                                dataFileStream.close();
                            }
                        }
                        throw th9;
                    }
                }
            });
            stopWatch.stop();
            processSession.getProvenanceReporter().send(flowFile, load.getUri().toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS), true);
            processSession.transfer(flowFile, SUCCESS);
        } catch (ProcessException | DatasetIOException e) {
            getLogger().error("Failed to read FlowFile", e);
            processSession.transfer(flowFile, FAILURE);
        } catch (ValidationException e2) {
            getLogger().error(e2.getMessage());
            getLogger().debug("Incompatible schema error", e2);
            processSession.transfer(flowFile, INCOMPATIBLE);
        }
    }

    private View<GenericData.Record> load(ProcessContext processContext, FlowFile flowFile) {
        return Datasets.load(processContext.getProperty(KITE_DATASET_URI).evaluateAttributeExpressions(flowFile).getValue(), GenericData.Record.class);
    }
}
