package org.apache.nifi.processors.iceberg;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.util.Tasks;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.services.iceberg.IcebergCatalogService;

@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information. The target Iceberg table should already exist and it must have matching schemas with the incoming records, which means the Record Reader schema must contain all the Iceberg schema fields, every additional field which is not present in the Iceberg schema will be ignored. To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
@DynamicProperty(name = "A custom key to add to the snapshot summary. The value must start with 'snapshot-property.' prefix.", value = "A custom value to add to the snapshot summary.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "Adds an entry with custom-key and corresponding value in the snapshot summary. The key format must be 'snapshot-property.custom-key'.")
@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
@WritesAttributes({@WritesAttribute(attribute = PutIceberg.ICEBERG_RECORD_COUNT, description = "The number of records in the FlowFile.")})
/* loaded from: input_file:org/apache/nifi/processors/iceberg/PutIceberg.class */
public class PutIceberg extends AbstractIcebergProcessor {
    public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
    public static final String ICEBERG_SNAPSHOT_SUMMARY_PREFIX = "snapshot-property.";
    public static final String ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID = "nifi-flowfile-uuid";
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor CATALOG_NAMESPACE = new PropertyDescriptor.Builder().name("catalog-namespace").displayName("Catalog Namespace").description("The namespace of the catalog.").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("table-name").displayName("Table Name").description("The name of the Iceberg table to write to.").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new PropertyDescriptor.Builder().name("unmatched-column-behavior").displayName("Unmatched Column Behavior").description("If an incoming record does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation.").allowableValues(UnmatchedColumnBehavior.class).defaultValue(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.getValue()).required(true).build();
    static final PropertyDescriptor FILE_FORMAT = new PropertyDescriptor.Builder().name("file-format").displayName("File Format").description("File format to use when writing Iceberg data files. If not set, then the 'write.format.default' table property will be used, default value is parquet.").allowableValues(new AllowableValue[]{new AllowableValue("AVRO"), new AllowableValue("PARQUET"), new AllowableValue("ORC")}).build();
    static final PropertyDescriptor MAXIMUM_FILE_SIZE = new PropertyDescriptor.Builder().name("maximum-file-size").displayName("Maximum File Size").description("The maximum size that a file can be, if the file size is exceeded a new file will be generated with the remaining data. If not set, then the 'write.target-file-size-bytes' table property will be used, default value is 512 MB.").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.LONG_VALIDATOR).build();
    static final PropertyDescriptor NUMBER_OF_COMMIT_RETRIES = new PropertyDescriptor.Builder().name("number-of-commit-retries").displayName("Number of Commit Retries").description("Number of times to retry a commit before failing.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("10").addValidator(StandardValidators.INTEGER_VALIDATOR).build();
    static final PropertyDescriptor MINIMUM_COMMIT_WAIT_TIME = new PropertyDescriptor.Builder().name("minimum-commit-wait-time").displayName("Minimum Commit Wait Time").description("Minimum time to wait before retrying a commit.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("100 ms").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    static final PropertyDescriptor MAXIMUM_COMMIT_WAIT_TIME = new PropertyDescriptor.Builder().name("maximum-commit-wait-time").displayName("Maximum Commit Wait Time").description("Maximum time to wait before retrying a commit.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("2 sec").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    static final PropertyDescriptor MAXIMUM_COMMIT_DURATION = new PropertyDescriptor.Builder().name("maximum-commit-duration").displayName("Maximum Commit Duration").description("Total retry timeout period for a commit.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("30 sec").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile is routed to this relationship after the data ingestion was successful.").build();
    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(RECORD_READER, CATALOG, CATALOG_NAMESPACE, TABLE_NAME, UNMATCHED_COLUMN_BEHAVIOR, FILE_FORMAT, MAXIMUM_FILE_SIZE, KERBEROS_USER_SERVICE, NUMBER_OF_COMMIT_RETRIES, MINIMUM_COMMIT_WAIT_TIME, MAXIMUM_COMMIT_WAIT_TIME, MAXIMUM_COMMIT_DURATION));
    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet(Arrays.asList(REL_SUCCESS, REL_FAILURE)));

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().name(str).required(false).addValidator((str2, str3, validationContext) -> {
            ValidationResult.Builder input = new ValidationResult.Builder().subject(str2).input(str3);
            if (str2.startsWith(ICEBERG_SNAPSHOT_SUMMARY_PREFIX)) {
                input.valid(true);
            } else {
                input.valid(false).explanation("Dynamic property key must begin with 'snapshot-property.'");
            }
            return input.build();
        }).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dynamic(true).build();
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList();
        IcebergCatalogService asControllerService = validationContext.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
        if (validationContext.getControllerServiceLookup().isControllerServiceEnabled(asControllerService)) {
            boolean isSet = validationContext.getProperty(KERBEROS_USER_SERVICE).isSet();
            boolean isSecurityEnabled = SecurityUtil.isSecurityEnabled(IcebergUtils.getConfigurationFromFiles(asControllerService.getConfigFilePaths()));
            if (isSecurityEnabled && !isSet) {
                arrayList.add(new ValidationResult.Builder().subject(KERBEROS_USER_SERVICE.getDisplayName()).valid(false).explanation("'hadoop.security.authentication' is set to 'kerberos' in the hadoop configuration files but no KerberosUserService is configured.").build());
            }
            if (!isSecurityEnabled && isSet) {
                arrayList.add(new ValidationResult.Builder().subject(KERBEROS_USER_SERVICE.getDisplayName()).valid(false).explanation("KerberosUserService is configured but 'hadoop.security.authentication' is not set to 'kerberos' in the hadoop configuration files.").build());
            }
        }
        return arrayList;
    }

    public void doOnTrigger(ProcessContext processContext, ProcessSession processSession, FlowFile flowFile) throws ProcessException {
        long nanoTime = System.nanoTime();
        RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        String value = processContext.getProperty(FILE_FORMAT).getValue();
        String value2 = processContext.getProperty(MAXIMUM_FILE_SIZE).evaluateAttributeExpressions(flowFile).getValue();
        try {
            Table loadTable = loadTable(processContext, flowFile);
            TaskWriter taskWriter = null;
            int i = 0;
            try {
                InputStream read = processSession.read(flowFile);
                try {
                    RecordReader createRecordReader = asControllerService.createRecordReader(flowFile, read, getLogger());
                    try {
                        FileFormat fileFormat = getFileFormat(loadTable.properties(), value);
                        TaskWriter<Record> create = new IcebergTaskWriterFactory(loadTable, flowFile.getId(), fileFormat, value2).create();
                        IcebergRecordConverter icebergRecordConverter = new IcebergRecordConverter(loadTable.schema(), createRecordReader.getSchema(), fileFormat, UnmatchedColumnBehavior.valueOf(processContext.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue()), getLogger());
                        while (true) {
                            org.apache.nifi.serialization.record.Record nextRecord = createRecordReader.nextRecord();
                            if (nextRecord == null) {
                                break;
                            }
                            create.write(icebergRecordConverter.convert(nextRecord));
                            i++;
                        }
                        appendDataFiles(processContext, flowFile, loadTable, create.complete());
                        if (createRecordReader != null) {
                            createRecordReader.close();
                        }
                        if (read != null) {
                            read.close();
                        }
                        FlowFile putAttribute = processSession.putAttribute(flowFile, ICEBERG_RECORD_COUNT, String.valueOf(i));
                        processSession.getProvenanceReporter().send(putAttribute, loadTable.location(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime));
                        processSession.transfer(putAttribute, REL_SUCCESS);
                    } catch (Throwable th) {
                        if (createRecordReader != null) {
                            try {
                                createRecordReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (Exception e) {
                getLogger().error("Exception occurred while writing iceberg records. Removing uncommitted data files", e);
                if (0 != 0) {
                    try {
                        abort(taskWriter.dataFiles(), loadTable);
                    } catch (Exception e2) {
                        getLogger().error("Failed to abort uncommitted data files", e2);
                        processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
                    }
                }
                processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
            }
        } catch (Exception e3) {
            getLogger().error("Failed to load table from catalog", e3);
            processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
        }
    }

    private Table loadTable(PropertyContext propertyContext, FlowFile flowFile) {
        IcebergCatalogService asControllerService = propertyContext.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
        String value = propertyContext.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions(flowFile).getValue();
        return new IcebergCatalogFactory(asControllerService).create().loadTable(TableIdentifier.of(Namespace.of(new String[]{value}), propertyContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue()));
    }

    void appendDataFiles(ProcessContext processContext, FlowFile flowFile, Table table, WriteResult writeResult) {
        int intValue = processContext.getProperty(NUMBER_OF_COMMIT_RETRIES).evaluateAttributeExpressions(flowFile).asInteger().intValue();
        long longValue = processContext.getProperty(MINIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
        long longValue2 = processContext.getProperty(MAXIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
        long longValue3 = processContext.getProperty(MAXIMUM_COMMIT_DURATION).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
        AppendFiles newAppend = table.newAppend();
        Stream stream = Arrays.stream(writeResult.dataFiles());
        Objects.requireNonNull(newAppend);
        stream.forEach(newAppend::appendFile);
        addSnapshotSummaryProperties(processContext, newAppend, flowFile);
        Tasks.foreach(new AppendFiles[]{newAppend}).exponentialBackoff(longValue, longValue2, longValue3, 2.0d).retry(intValue).onlyRetryOn(CommitFailedException.class).run((v0) -> {
            v0.commit();
        });
    }

    private void addSnapshotSummaryProperties(ProcessContext processContext, AppendFiles appendFiles, FlowFile flowFile) {
        appendFiles.set(ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID, flowFile.getAttribute(CoreAttributes.UUID.key()));
        for (Map.Entry entry : IcebergUtils.getDynamicProperties(processContext, flowFile).entrySet()) {
            appendFiles.set(((String) entry.getKey()).substring(ICEBERG_SNAPSHOT_SUMMARY_PREFIX.length()), (String) entry.getValue());
        }
    }

    private FileFormat getFileFormat(Map<String, String> map, String str) {
        return FileFormat.valueOf((str != null ? str : map.getOrDefault("write.format.default", "parquet")).toUpperCase(Locale.ENGLISH));
    }

    void abort(DataFile[] dataFileArr, Table table) {
        Tasks.foreach(dataFileArr).retry(3).run(dataFile -> {
            table.io().deleteFile(dataFile.path().toString());
        });
    }
}
