/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O>
extends HoodieWriteHandle<T, I, K, O> {
    private static final Logger LOG = LogManager.getLogger(HoodieAppendHandle.class);
    private static final AtomicLong RECORD_COUNTER = new AtomicLong(1L);
    protected final String fileId;
    private final List<IndexedRecord> recordList = new ArrayList<IndexedRecord>();
    private final List<HoodieKey> keysToDelete = new ArrayList<HoodieKey>();
    protected Iterator<HoodieRecord<T>> recordItr;
    protected HoodieLogFormat.Writer writer;
    protected final List<WriteStatus> statuses;
    protected long recordsWritten = 0L;
    protected long recordsDeleted = 0L;
    protected long updatedRecordsWritten = 0L;
    protected long insertRecordsWritten = 0L;
    private long averageRecordSize = 0L;
    private boolean doInit = true;
    protected long estimatedNumberOfBytesWritten;
    private int numberOfRecords = 0;
    private final int maxBlockSize = this.config.getLogFileDataBlockMaxSize();
    protected final Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
    private SizeEstimator<HoodieRecord> sizeEstimator;
    private Properties recordProperties = new Properties();

    public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr, TaskContextSupplier taskContextSupplier) {
        super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
        this.fileId = fileId;
        this.recordItr = recordItr;
        this.sizeEstimator = new DefaultSizeEstimator();
        this.statuses = new ArrayList<WriteStatus>();
        this.recordProperties.putAll((Map<?, ?>)config.getProps());
    }

    public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, String partitionPath, String fileId, TaskContextSupplier sparkTaskContextSupplier) {
        this(config, instantTime, hoodieTable, partitionPath, fileId, null, sparkTaskContextSupplier);
    }

    private void init(HoodieRecord record) {
        if (this.doInit) {
            String baseInstantTime;
            TableFileSystemView.SliceView rtView = this.hoodieTable.getSliceView();
            Option fileSlice = rtView.getLatestFileSlice(this.partitionPath, this.fileId);
            String baseFile = "";
            List<Object> logFiles = new ArrayList();
            if (fileSlice.isPresent()) {
                baseInstantTime = ((FileSlice)fileSlice.get()).getBaseInstantTime();
                baseFile = (String)((FileSlice)fileSlice.get()).getBaseFile().map(BaseFile::getFileName).orElse((Object)"");
                logFiles = ((FileSlice)fileSlice.get()).getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
            } else {
                baseInstantTime = this.instantTime;
                fileSlice = Option.of((Object)new FileSlice(this.partitionPath, baseInstantTime, this.fileId));
                LOG.info((Object)("New AppendHandle for partition :" + this.partitionPath));
            }
            this.writeStatus.setStat((HoodieWriteStat)new HoodieDeltaWriteStat());
            this.writeStatus.setFileId(this.fileId);
            this.writeStatus.setPartitionPath(this.partitionPath);
            this.averageRecordSize = this.sizeEstimator.sizeEstimate((Object)record);
            HoodieDeltaWriteStat deltaWriteStat = (HoodieDeltaWriteStat)this.writeStatus.getStat();
            deltaWriteStat.setPrevCommit(baseInstantTime);
            deltaWriteStat.setPartitionPath(this.partitionPath);
            deltaWriteStat.setFileId(this.fileId);
            deltaWriteStat.setBaseFile(baseFile);
            deltaWriteStat.setLogFiles(logFiles);
            try {
                HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(this.fs, baseInstantTime, new Path(this.config.getBasePath()), FSUtils.getPartitionPath((String)this.config.getBasePath(), (String)this.partitionPath));
                partitionMetadata.trySave(this.getPartitionId());
                this.createMarkerFile(this.partitionPath, FSUtils.makeDataFileName((String)baseInstantTime, (String)this.writeToken, (String)this.fileId, (String)this.hoodieTable.getBaseFileExtension()));
                this.writer = this.createLogWriter((Option<FileSlice>)fileSlice, baseInstantTime);
            }
            catch (Exception e) {
                LOG.error((Object)("Error in update task at commit " + this.instantTime), (Throwable)e);
                this.writeStatus.setGlobalError(e);
                throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + this.fileId + " on commit " + this.instantTime + " on HDFS path " + this.hoodieTable.getMetaClient().getBasePath() + this.partitionPath, e);
            }
            this.doInit = false;
        }
    }

    protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
        return hoodieRecord.getCurrentLocation() != null;
    }

    private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
        Option recordMetadata = hoodieRecord.getData().getMetadata();
        try {
            Option avroRecord;
            boolean isUpdateRecord = this.isUpdateRecord(hoodieRecord);
            boolean nullifyPayload = HoodieOperation.isDelete((HoodieOperation)hoodieRecord.getOperation()) && !this.config.allowOperationMetadataField();
            this.recordProperties.put("hoodie.is.update.record.for.mor", String.valueOf(isUpdateRecord));
            Option option = avroRecord = nullifyPayload ? Option.empty() : hoodieRecord.getData().getInsertValue(this.tableSchema, this.recordProperties);
            if (avroRecord.isPresent()) {
                if (((IndexedRecord)avroRecord.get()).equals(IGNORE_RECORD)) {
                    return avroRecord;
                }
                GenericRecord rewriteRecord = this.rewriteRecord((GenericRecord)avroRecord.get());
                avroRecord = Option.of((Object)rewriteRecord);
                String seqId = HoodieRecord.generateSequenceId((String)this.instantTime, (int)this.getPartitionId(), (long)RECORD_COUNTER.getAndIncrement());
                if (this.config.populateMetaFields()) {
                    HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord)rewriteRecord, (String)hoodieRecord.getRecordKey(), (String)hoodieRecord.getPartitionPath(), (String)this.fileId);
                    HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord)rewriteRecord, (String)this.instantTime, (String)seqId);
                }
                if (this.config.allowOperationMetadataField()) {
                    HoodieAvroUtils.addOperationToRecord((GenericRecord)rewriteRecord, (HoodieOperation)hoodieRecord.getOperation());
                }
                if (this.isUpdateRecord(hoodieRecord)) {
                    ++this.updatedRecordsWritten;
                } else {
                    ++this.insertRecordsWritten;
                }
                ++this.recordsWritten;
            } else {
                ++this.recordsDeleted;
            }
            this.writeStatus.markSuccess(hoodieRecord, (Option<Map<String, String>>)recordMetadata);
            hoodieRecord.deflate();
            return avroRecord;
        }
        catch (Exception e) {
            LOG.error((Object)("Error writing record  " + hoodieRecord), (Throwable)e);
            this.writeStatus.markFailure(hoodieRecord, e, (Option<Map<String, String>>)recordMetadata);
            return Option.empty();
        }
    }

    private void initNewStatus() {
        HoodieDeltaWriteStat prevStat = (HoodieDeltaWriteStat)this.writeStatus.getStat();
        HoodieDeltaWriteStat stat = new HoodieDeltaWriteStat();
        stat.setFileId(this.fileId);
        stat.setPartitionPath(this.partitionPath);
        stat.setPrevCommit(prevStat.getPrevCommit());
        stat.setBaseFile(prevStat.getBaseFile());
        stat.setLogFiles(new ArrayList(prevStat.getLogFiles()));
        this.writeStatus = (WriteStatus)ReflectionUtils.loadClass((String)this.config.getWriteStatusClassName(), (Object[])new Object[]{!this.hoodieTable.getIndex().isImplicitWithStorage(), this.config.getWriteStatusFailureFraction()});
        this.writeStatus.setFileId(this.fileId);
        this.writeStatus.setPartitionPath(this.partitionPath);
        this.writeStatus.setStat((HoodieWriteStat)stat);
    }

    private String makeFilePath(HoodieLogFile logFile) {
        return this.partitionPath.length() == 0 ? new Path(logFile.getFileName()).toString() : new Path(this.partitionPath, logFile.getFileName()).toString();
    }

    private void resetWriteCounts() {
        this.recordsWritten = 0L;
        this.updatedRecordsWritten = 0L;
        this.insertRecordsWritten = 0L;
        this.recordsDeleted = 0L;
    }

    private void updateWriteCounts(HoodieDeltaWriteStat stat, AppendResult result) {
        stat.setNumWrites(this.recordsWritten);
        stat.setNumUpdateWrites(this.updatedRecordsWritten);
        stat.setNumInserts(this.insertRecordsWritten);
        stat.setNumDeletes(this.recordsDeleted);
        stat.setTotalWriteBytes(result.size());
    }

    private void accumulateWriteCounts(HoodieDeltaWriteStat stat, AppendResult result) {
        stat.setNumWrites(stat.getNumWrites() + this.recordsWritten);
        stat.setNumUpdateWrites(stat.getNumUpdateWrites() + this.updatedRecordsWritten);
        stat.setNumInserts(stat.getNumInserts() + this.insertRecordsWritten);
        stat.setNumDeletes(stat.getNumDeletes() + this.recordsDeleted);
        stat.setTotalWriteBytes(stat.getTotalWriteBytes() + result.size());
    }

    private void updateWriteStat(HoodieDeltaWriteStat stat, AppendResult result) {
        stat.setPath(this.makeFilePath(result.logFile()));
        stat.setLogOffset(result.offset());
        stat.setLogVersion(result.logFile().getLogVersion());
        if (!stat.getLogFiles().contains(result.logFile().getFileName())) {
            stat.addLogFiles(result.logFile().getFileName());
        }
        stat.setFileSizeInBytes(result.size());
    }

    private void updateRuntimeStats(HoodieDeltaWriteStat stat) {
        HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
        runtimeStats.setTotalUpsertTime(this.timer.endTimer());
        stat.setRuntimeStats(runtimeStats);
    }

    private void accumulateRuntimeStats(HoodieDeltaWriteStat stat) {
        HoodieWriteStat.RuntimeStats runtimeStats = stat.getRuntimeStats();
        assert (runtimeStats != null);
        runtimeStats.setTotalUpsertTime(runtimeStats.getTotalUpsertTime() + this.timer.endTimer());
    }

    private void updateWriteStatus(HoodieDeltaWriteStat stat, AppendResult result) {
        this.updateWriteStat(stat, result);
        this.updateWriteCounts(stat, result);
        this.updateRuntimeStats(stat);
        this.statuses.add(this.writeStatus);
    }

    private void processAppendResult(AppendResult result) {
        HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat)this.writeStatus.getStat();
        if (stat.getPath() == null) {
            this.updateWriteStatus(stat, result);
        } else if (stat.getPath().endsWith(result.logFile().getFileName())) {
            stat.setLogOffset(Math.min(stat.getLogOffset(), result.offset()));
            stat.setFileSizeInBytes(stat.getFileSizeInBytes() + result.size());
            this.accumulateWriteCounts(stat, result);
            this.accumulateRuntimeStats(stat);
        } else {
            this.initNewStatus();
            stat = (HoodieDeltaWriteStat)this.writeStatus.getStat();
            this.updateWriteStatus(stat, result);
        }
        this.resetWriteCounts();
        assert (stat.getRuntimeStats() != null);
        LOG.info((Object)String.format("AppendHandle for partitionPath %s filePath %s, took %d ms.", this.partitionPath, stat.getPath(), stat.getRuntimeStats().getTotalUpsertTime()));
        this.timer.startTimer();
    }

    public void doAppend() {
        while (this.recordItr.hasNext()) {
            HoodieRecord<T> record = this.recordItr.next();
            this.init(record);
            this.flushToDiskIfRequired(record);
            this.writeToBuffer(record);
        }
        this.appendDataAndDeleteBlocks(this.header);
        this.estimatedNumberOfBytesWritten += this.averageRecordSize * (long)this.numberOfRecords;
    }

    protected void appendDataAndDeleteBlocks(Map<HoodieLogBlock.HeaderMetadataType, String> header) {
        try {
            header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, this.instantTime);
            header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, this.writeSchemaWithMetaFields.toString());
            ArrayList<Object> blocks = new ArrayList<Object>(2);
            if (this.recordList.size() > 0) {
                blocks.add(HoodieDataBlock.getBlock((HoodieLogBlock.HoodieLogBlockType)this.hoodieTable.getLogDataBlockFormat(), this.recordList, header));
            }
            if (this.keysToDelete.size() > 0) {
                blocks.add(new HoodieDeleteBlock(this.keysToDelete.toArray(new HoodieKey[this.keysToDelete.size()]), header));
            }
            if (blocks.size() > 0) {
                AppendResult appendResult = this.writer.appendBlocks(blocks);
                this.processAppendResult(appendResult);
                this.recordList.clear();
                this.keysToDelete.clear();
            }
        }
        catch (Exception e) {
            throw new HoodieAppendException("Failed while appending records to " + this.writer.getLogFile().getPath(), e);
        }
    }

    @Override
    public boolean canWrite(HoodieRecord record) {
        return (double)this.config.getParquetMaxFileSize() >= (double)this.estimatedNumberOfBytesWritten * this.config.getLogFileToParquetCompressionRatio();
    }

    @Override
    public void write(HoodieRecord record, Option<IndexedRecord> insertValue) {
        Option recordMetadata = record.getData().getMetadata();
        try {
            this.init(record);
            this.flushToDiskIfRequired(record);
            this.writeToBuffer(record);
        }
        catch (Throwable t) {
            this.writeStatus.markFailure(record, t, (Option<Map<String, String>>)recordMetadata);
            LOG.error((Object)("Error writing record " + record), t);
        }
    }

    @Override
    public List<WriteStatus> close() {
        try {
            this.appendDataAndDeleteBlocks(this.header);
            this.recordItr = null;
            if (this.writer != null) {
                this.writer.close();
                this.writer = null;
                for (WriteStatus status : this.statuses) {
                    long logFileSize = FSUtils.getFileSize((FileSystem)this.fs, (Path)new Path(this.config.getBasePath(), status.getStat().getPath()));
                    status.getStat().setFileSizeInBytes(logFileSize);
                }
            }
            return this.statuses;
        }
        catch (IOException e) {
            throw new HoodieUpsertException("Failed to close UpdateHandle", e);
        }
    }

    @Override
    public IOType getIOType() {
        return IOType.APPEND;
    }

    @Override
    public List<WriteStatus> writeStatuses() {
        return this.statuses;
    }

    private HoodieLogFormat.Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTime) throws IOException, InterruptedException {
        Option latestLogFile = ((FileSlice)fileSlice.get()).getLatestLogFile();
        return HoodieLogFormat.newWriterBuilder().onParentPath(FSUtils.getPartitionPath((String)this.hoodieTable.getMetaClient().getBasePath(), (String)this.partitionPath)).withFileId(this.fileId).overBaseCommit(baseCommitTime).withLogVersion(((Integer)latestLogFile.map(HoodieLogFile::getLogVersion).orElse((Object)HoodieLogFile.LOGFILE_BASE_VERSION)).intValue()).withFileSize(((Long)latestLogFile.map(HoodieLogFile::getFileSize).orElse((Object)0L)).longValue()).withSizeThreshold((long)this.config.getLogFileMaxSize()).withFs(this.fs).withRolloverLogWriteToken(this.writeToken).withLogWriteToken((String)latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath((Path)x.getPath())).orElse((Object)this.writeToken)).withFileExtension(".log").build();
    }

    protected boolean needsUpdateLocation() {
        return true;
    }

    private void writeToBuffer(HoodieRecord<T> record) {
        Option<IndexedRecord> indexedRecord;
        if (!this.partitionPath.equals(record.getPartitionPath())) {
            HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " + record.getPartitionPath() + " but trying to insert into partition: " + this.partitionPath);
            this.writeStatus.markFailure(record, (Throwable)((Object)failureEx), (Option<Map<String, String>>)record.getData().getMetadata());
            return;
        }
        if (this.needsUpdateLocation()) {
            record.unseal();
            record.setNewLocation(new HoodieRecordLocation(this.instantTime, this.fileId));
            record.seal();
        }
        if ((indexedRecord = this.getIndexedRecord(record)).isPresent()) {
            if (!((IndexedRecord)indexedRecord.get()).equals(IGNORE_RECORD)) {
                this.recordList.add((IndexedRecord)indexedRecord.get());
            }
        } else {
            this.keysToDelete.add(record.getKey());
        }
        ++this.numberOfRecords;
    }

    private void flushToDiskIfRequired(HoodieRecord record) {
        if (this.numberOfRecords >= (int)((long)this.maxBlockSize / this.averageRecordSize)) {
            LOG.info((Object)("AvgRecordSize => " + this.averageRecordSize));
            this.averageRecordSize = (this.averageRecordSize + this.sizeEstimator.sizeEstimate((Object)record)) / 2L;
            this.appendDataAndDeleteBlocks(this.header);
            this.estimatedNumberOfBytesWritten += this.averageRecordSize * (long)this.numberOfRecords;
            this.numberOfRecords = 0;
        }
    }
}

