package org.apache.hudi.io;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/io/HoodieMergeHandle.class */
public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWriteHandle<T> {
    private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
    private Map<String, HoodieRecord<T>> keyToNewRecords;
    private Set<String> writtenRecordKeys;
    private HoodieFileWriter<IndexedRecord> fileWriter;
    private Path newFilePath;
    private Path oldFilePath;
    private long recordsWritten;
    private long recordsDeleted;
    private long updatedRecordsWritten;
    private long insertRecordsWritten;
    private boolean useWriterSchema;
    private HoodieBaseFile baseFileToMerge;

    public HoodieMergeHandle(HoodieWriteConfig hoodieWriteConfig, String str, HoodieTable<T> hoodieTable, Iterator<HoodieRecord<T>> it, String str2, String str3, SparkTaskContextSupplier sparkTaskContextSupplier) {
        super(hoodieWriteConfig, str, str2, str3, hoodieTable, sparkTaskContextSupplier);
        this.recordsWritten = 0L;
        this.recordsDeleted = 0L;
        this.updatedRecordsWritten = 0L;
        this.insertRecordsWritten = 0L;
        init(str3, it);
        init(str3, str2, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(str2, str3).get());
    }

    public HoodieMergeHandle(HoodieWriteConfig hoodieWriteConfig, String str, HoodieTable<T> hoodieTable, Map<String, HoodieRecord<T>> map, String str2, String str3, HoodieBaseFile hoodieBaseFile, SparkTaskContextSupplier sparkTaskContextSupplier) {
        super(hoodieWriteConfig, str, str2, str3, hoodieTable, sparkTaskContextSupplier);
        this.recordsWritten = 0L;
        this.recordsDeleted = 0L;
        this.updatedRecordsWritten = 0L;
        this.insertRecordsWritten = 0L;
        this.keyToNewRecords = map;
        this.useWriterSchema = true;
        init(str3, this.partitionPath, hoodieBaseFile);
    }

    @Override // org.apache.hudi.io.HoodieWriteHandle
    public Schema getWriterSchemaWithMetafields() {
        return this.writerSchemaWithMetafields;
    }

    public Schema getWriterSchema() {
        return this.writerSchema;
    }

    private void init(String str, String str2, HoodieBaseFile hoodieBaseFile) {
        LOG.info("partitionPath:" + str2 + ", fileId to be merged:" + str);
        this.baseFileToMerge = hoodieBaseFile;
        this.writtenRecordKeys = new HashSet();
        this.writeStatus.setStat(new HoodieWriteStat());
        try {
            String fileName = hoodieBaseFile.getFileName();
            this.writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(fileName));
            new HoodiePartitionMetadata(this.fs, this.instantTime, new Path(this.config.getBasePath()), FSUtils.getPartitionPath(this.config.getBasePath(), str2)).trySave(getPartitionId());
            this.oldFilePath = new Path(this.config.getBasePath() + "/" + str2 + "/" + fileName);
            String makeDataFileName = FSUtils.makeDataFileName(this.instantTime, this.writeToken, str, this.hoodieTable.getBaseFileExtension());
            this.newFilePath = new Path(this.config.getBasePath(), new Path((str2.isEmpty() ? "" : str2 + "/") + makeDataFileName).toString());
            LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", this.oldFilePath.toString(), this.newFilePath.toString()));
            this.writeStatus.setFileId(str);
            this.writeStatus.setPartitionPath(str2);
            this.writeStatus.getStat().setPartitionPath(str2);
            this.writeStatus.getStat().setFileId(str);
            this.writeStatus.getStat().setPath(new Path(this.config.getBasePath()), this.newFilePath);
            createMarkerFile(str2, makeDataFileName);
            this.fileWriter = createNewFileWriter(this.instantTime, this.newFilePath, this.hoodieTable, this.config, this.writerSchemaWithMetafields, this.sparkTaskContextSupplier);
        } catch (IOException e) {
            LOG.error("Error in update task at commit " + this.instantTime, e);
            this.writeStatus.setGlobalError(e);
            throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle for FileId: " + str + " on commit " + this.instantTime + " on path " + this.hoodieTable.getMetaClient().getBasePath(), e);
        }
    }

    private void init(String str, Iterator<HoodieRecord<T>> it) {
        try {
            long maxMemoryPerPartitionMerge = SparkConfigUtils.getMaxMemoryPerPartitionMerge(this.config.getProps());
            LOG.info("MaxMemoryPerPartitionMerge => " + maxMemoryPerPartitionMerge);
            this.keyToNewRecords = new ExternalSpillableMap(Long.valueOf(maxMemoryPerPartitionMerge), this.config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(this.writerSchema));
            while (it.hasNext()) {
                HoodieRecord<T> next = it.next();
                next.unseal();
                next.setNewLocation(new HoodieRecordLocation(this.instantTime, str));
                next.seal();
                this.keyToNewRecords.put(next.getRecordKey(), next);
            }
            LOG.info("Number of entries in MemoryBasedMap => " + ((ExternalSpillableMap) this.keyToNewRecords).getInMemoryMapNumEntries() + "Total size in bytes of MemoryBasedMap => " + ((ExternalSpillableMap) this.keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => " + ((ExternalSpillableMap) this.keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => " + ((ExternalSpillableMap) this.keyToNewRecords).getSizeOfFileOnDiskInBytes());
        } catch (IOException e) {
            throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", e);
        }
    }

    private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> option) {
        if (option.isPresent()) {
            this.updatedRecordsWritten++;
        }
        return writeRecord(hoodieRecord, option);
    }

    private boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> option) {
        Option<Map<String, String>> metadata = hoodieRecord.getData().getMetadata();
        if (!this.partitionPath.equals(hoodieRecord.getPartitionPath())) {
            this.writeStatus.markFailure(hoodieRecord, new HoodieUpsertException("mismatched partition path, record partition: " + hoodieRecord.getPartitionPath() + " but trying to insert into partition: " + this.partitionPath), metadata);
            return false;
        }
        try {
            if (option.isPresent()) {
                this.fileWriter.writeAvroWithMetadata(rewriteRecord(option.get()), hoodieRecord);
                this.recordsWritten++;
            } else {
                this.recordsDeleted++;
            }
            this.writeStatus.markSuccess(hoodieRecord, metadata);
            hoodieRecord.deflate();
            return true;
        } catch (Exception e) {
            LOG.error("Error writing record  " + hoodieRecord, e);
            this.writeStatus.markFailure(hoodieRecord, e, metadata);
            return false;
        }
    }

    public void write(GenericRecord genericRecord) {
        String obj = genericRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        boolean z = true;
        if (this.keyToNewRecords.containsKey(obj)) {
            HoodieRecord<T> hoodieRecord = new HoodieRecord<>(this.keyToNewRecords.get(obj));
            try {
                if (writeUpdateRecord(hoodieRecord, hoodieRecord.getData().combineAndGetUpdateValue(genericRecord, this.useWriterSchema ? this.writerSchemaWithMetafields : this.writerSchema))) {
                    z = false;
                }
                this.writtenRecordKeys.add(obj);
            } catch (Exception e) {
                throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {" + this.keyToNewRecords.get(obj) + "}, old value {" + genericRecord + "}", e);
            }
        }
        if (z) {
            String str = "Failed to merge old record into new file for key " + obj + " from old file " + getOldFilePath() + " to new file " + this.newFilePath;
            try {
                this.fileWriter.writeAvro(obj, genericRecord);
                this.recordsWritten++;
            } catch (IOException e2) {
                LOG.error("Failed to merge old record into new file for key " + obj + " from old file " + getOldFilePath() + " to new file " + this.newFilePath, e2);
                throw new HoodieUpsertException(str, e2);
            } catch (ClassCastException e3) {
                LOG.error("Schema mismatch when rewriting old record " + genericRecord + " from file " + getOldFilePath() + " to file " + this.newFilePath + " with writerSchema " + this.writerSchemaWithMetafields.toString(true));
                throw new HoodieUpsertException(str, e3);
            }
        }
    }

    @Override // org.apache.hudi.io.HoodieWriteHandle
    public WriteStatus close() {
        try {
            Iterator<HoodieRecord<T>> it = this.keyToNewRecords instanceof ExternalSpillableMap ? ((ExternalSpillableMap) this.keyToNewRecords).iterator() : this.keyToNewRecords.values().iterator();
            while (it.hasNext()) {
                HoodieRecord<T> next = it.next();
                if (!this.writtenRecordKeys.contains(next.getRecordKey())) {
                    if (this.useWriterSchema) {
                        writeRecord(next, next.getData().getInsertValue(this.writerSchemaWithMetafields));
                    } else {
                        writeRecord(next, next.getData().getInsertValue(this.writerSchema));
                    }
                    this.insertRecordsWritten++;
                }
            }
            this.keyToNewRecords.clear();
            this.writtenRecordKeys.clear();
            if (this.fileWriter != null) {
                this.fileWriter.close();
            }
            long fileSize = FSUtils.getFileSize(this.fs, this.newFilePath);
            HoodieWriteStat stat = this.writeStatus.getStat();
            stat.setTotalWriteBytes(fileSize);
            stat.setFileSizeInBytes(fileSize);
            stat.setNumWrites(this.recordsWritten);
            stat.setNumDeletes(this.recordsDeleted);
            stat.setNumUpdateWrites(this.updatedRecordsWritten);
            stat.setNumInserts(this.insertRecordsWritten);
            stat.setTotalWriteErrors(this.writeStatus.getTotalErrorRecords());
            HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
            runtimeStats.setTotalUpsertTime(this.timer.endTimer());
            stat.setRuntimeStats(runtimeStats);
            LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), stat.getFileId(), Long.valueOf(runtimeStats.getTotalUpsertTime())));
            return this.writeStatus;
        } catch (IOException e) {
            throw new HoodieUpsertException("Failed to close UpdateHandle", e);
        }
    }

    public Path getOldFilePath() {
        return this.oldFilePath;
    }

    @Override // org.apache.hudi.io.HoodieWriteHandle
    public WriteStatus getWriteStatus() {
        return this.writeStatus;
    }

    @Override // org.apache.hudi.io.HoodieWriteHandle
    public IOType getIOType() {
        return IOType.MERGE;
    }

    public HoodieBaseFile baseFileForMerge() {
        return this.baseFileToMerge;
    }
}
