/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.format.mor;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.RecordIterators;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.util.StringToRowDataConverter;

public class MergeOnReadInputFormat
extends RichInputFormat<RowData, MergeOnReadInputSplit> {
    private static final long serialVersionUID = 1L;
    protected final Configuration conf;
    protected transient org.apache.hadoop.conf.Configuration hadoopConf;
    protected final MergeOnReadTableState tableState;
    private transient ClosableIterator<RowData> iterator;
    private final List<String> fieldNames;
    private final List<DataType> fieldTypes;
    private final String defaultPartName;
    private final int[] requiredPos;
    private final List<ExpressionPredicates.Predicate> predicates;
    private final long limit;
    private long currentReadCount = 0L;
    private boolean emitDelete;
    private boolean closed = true;
    private final InternalSchemaManager internalSchemaManager;

    protected MergeOnReadInputFormat(Configuration conf, MergeOnReadTableState tableState, List<DataType> fieldTypes, String defaultPartName, List<ExpressionPredicates.Predicate> predicates, long limit, boolean emitDelete, InternalSchemaManager internalSchemaManager) {
        this.conf = conf;
        this.tableState = tableState;
        this.fieldNames = tableState.getRowType().getFieldNames();
        this.fieldTypes = fieldTypes;
        this.defaultPartName = defaultPartName;
        this.requiredPos = tableState.getRequiredPositions();
        this.predicates = predicates;
        this.limit = limit;
        this.emitDelete = emitDelete;
        this.internalSchemaManager = internalSchemaManager;
    }

    public static Builder builder() {
        return new Builder();
    }

    public void open(MergeOnReadInputSplit split) throws IOException {
        this.currentReadCount = 0L;
        this.closed = false;
        this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
        this.iterator = this.initIterator(split);
        this.mayShiftInputSplit(split);
    }

    protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit split) throws IOException {
        if (!split.getLogPaths().isPresent() || ((List)split.getLogPaths().get()).size() <= 0) {
            if (split.getInstantRange().isPresent()) {
                return new BaseFileOnlyFilteringIterator((InstantRange)split.getInstantRange().get(), this.tableState.getRequiredRowType(), this.requiredPos, this.getBaseFileIterator((String)split.getBasePath().get(), MergeOnReadInputFormat.getRequiredPosWithCommitTime(this.requiredPos)));
            }
            return this.getBaseFileIterator((String)split.getBasePath().get());
        }
        if (!split.getBasePath().isPresent()) {
            if (OptionsResolver.emitChangelog(this.conf)) {
                return new LogFileOnlyIterator(this.getUnMergedLogFileIterator(split));
            }
            return new LogFileOnlyIterator(this.getLogFileIterator(split));
        }
        if (split.getMergeType().equals("skip_merge")) {
            return new SkipMergeIterator(this.getBaseFileIterator((String)split.getBasePath().get()), this.getLogFileIterator(split));
        }
        if (split.getMergeType().equals("payload_combine")) {
            return new MergeIterator(this.conf, this.hadoopConf, split, this.tableState.getRowType(), this.tableState.getRequiredRowType(), new Schema.Parser().parse(this.tableState.getAvroSchema()), new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()), this.internalSchemaManager.getQuerySchema(), this.requiredPos, this.emitDelete, this.tableState.getOperationPos(), this.getBaseFileIteratorWithMetadata((String)split.getBasePath().get()));
        }
        throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for file path: " + split.getBasePath() + "log paths: " + split.getLogPaths() + "hoodie table path: " + split.getTablePath() + "flink partition Index: " + split.getSplitNumber() + "merge type: " + split.getMergeType());
    }

    public void configure(Configuration configuration) {
    }

    public BaseStatistics getStatistics(BaseStatistics baseStatistics) {
        return null;
    }

    public MergeOnReadInputSplit[] createInputSplits(int minNumSplits) {
        return this.tableState.getInputSplits().toArray(new MergeOnReadInputSplit[0]);
    }

    public InputSplitAssigner getInputSplitAssigner(MergeOnReadInputSplit[] mergeOnReadInputSplits) {
        return new DefaultInputSplitAssigner((InputSplit[])mergeOnReadInputSplits);
    }

    public boolean reachedEnd() throws IOException {
        if (this.limit > 0L && this.currentReadCount >= this.limit) {
            return true;
        }
        return !this.iterator.hasNext();
    }

    public RowData nextRecord(RowData o) {
        ++this.currentReadCount;
        return (RowData)this.iterator.next();
    }

    public void close() throws IOException {
        if (this.iterator != null) {
            this.iterator.close();
        }
        this.iterator = null;
        this.closed = true;
    }

    public boolean isClosed() {
        return this.closed;
    }

    private void mayShiftInputSplit(MergeOnReadInputSplit split) throws IOException {
        if (split.isConsumed()) {
            for (long i = 0L; i < split.getConsumed() && !this.reachedEnd(); ++i) {
                this.nextRecord(null);
            }
        }
    }

    protected ClosableIterator<RowData> getBaseFileIteratorWithMetadata(String path) {
        try {
            return this.getBaseFileIterator(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray());
        }
        catch (IOException e) {
            throw new HoodieException("Get reader error for path: " + path);
        }
    }

    protected ClosableIterator<RowData> getBaseFileIterator(String path) throws IOException {
        return this.getBaseFileIterator(path, this.requiredPos);
    }

    private ClosableIterator<RowData> getBaseFileIterator(String path, int[] requiredPos) throws IOException {
        LinkedHashMap<String, Object> partObjects = FilePathUtils.generatePartitionSpecs(path, this.fieldNames, this.fieldTypes, this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD), this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
        return RecordIterators.getParquetRecordIterator(this.internalSchemaManager, this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), true, HadoopConfigurations.getParquetConf(this.conf, this.hadoopConf), this.fieldNames.toArray(new String[0]), this.fieldTypes.toArray(new DataType[0]), partObjects, requiredPos, 2048, new Path(path), 0L, Long.MAX_VALUE, this.predicates);
    }

    private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
        final Schema tableSchema = new Schema.Parser().parse(this.tableState.getAvroSchema());
        final Schema requiredSchema = new Schema.Parser().parse(this.tableState.getRequiredAvroSchema());
        final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
        final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(this.tableState.getRequiredRowType());
        final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, this.internalSchemaManager.getQuerySchema(), this.conf, this.hadoopConf);
        final Iterator logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
        final int[] pkOffset = this.tableState.getPkOffsetsInRequired();
        final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1);
        LogicalType[] pkTypes = pkSemanticLost ? null : this.tableState.getPkTypes(pkOffset);
        final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes);
        return new ClosableIterator<RowData>(){
            private RowData currentRecord;

            public boolean hasNext() {
                while (logRecordsKeyIterator.hasNext()) {
                    String curAvroKey = (String)logRecordsKeyIterator.next();
                    Option curAvroRecord = null;
                    HoodieAvroRecord hoodieRecord = (HoodieAvroRecord)scanner.getRecords().get(curAvroKey);
                    try {
                        curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
                    }
                    catch (IOException e) {
                        throw new HoodieException("Get avro insert value error for key: " + curAvroKey, (Throwable)e);
                    }
                    if (!curAvroRecord.isPresent()) {
                        if (!MergeOnReadInputFormat.this.emitDelete || pkSemanticLost) continue;
                        GenericRowData delete = new GenericRowData(MergeOnReadInputFormat.this.tableState.getRequiredRowType().getFieldCount());
                        String recordKey = hoodieRecord.getRecordKey();
                        String[] pkFields = KeyGenUtils.extractRecordKeys((String)recordKey);
                        Object[] converted = converter.convert(pkFields);
                        for (int i = 0; i < pkOffset.length; ++i) {
                            delete.setField(pkOffset[i], converted[i]);
                        }
                        delete.setRowKind(RowKind.DELETE);
                        this.currentRecord = delete;
                        return true;
                    }
                    IndexedRecord avroRecord = (IndexedRecord)curAvroRecord.get();
                    RowKind rowKind = FormatUtils.getRowKindSafely(avroRecord, MergeOnReadInputFormat.this.tableState.getOperationPos());
                    if (rowKind == RowKind.DELETE && !MergeOnReadInputFormat.this.emitDelete) continue;
                    GenericRecord requiredAvroRecord = FormatUtils.buildAvroRecordBySchema(avroRecord, requiredSchema, MergeOnReadInputFormat.this.requiredPos, recordBuilder);
                    this.currentRecord = (RowData)avroToRowDataConverter.convert(requiredAvroRecord);
                    this.currentRecord.setRowKind(rowKind);
                    return true;
                }
                return false;
            }

            public RowData next() {
                return this.currentRecord;
            }

            public void close() {
                scanner.close();
            }
        };
    }

    private ClosableIterator<RowData> getUnMergedLogFileIterator(MergeOnReadInputSplit split) {
        final Schema tableSchema = new Schema.Parser().parse(this.tableState.getAvroSchema());
        final Schema requiredSchema = new Schema.Parser().parse(this.tableState.getRequiredAvroSchema());
        final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
        final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(this.tableState.getRequiredRowType());
        final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, this.internalSchemaManager.getQuerySchema(), this.hadoopConf, this.conf);
        final Iterator<HoodieRecord<?>> recordsIterator = records.getRecordsIterator();
        return new ClosableIterator<RowData>(){
            private RowData currentRecord;

            public boolean hasNext() {
                while (recordsIterator.hasNext()) {
                    Option curAvroRecord = null;
                    HoodieAvroRecord hoodieRecord = (HoodieAvroRecord)recordsIterator.next();
                    try {
                        curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
                    }
                    catch (IOException e) {
                        throw new HoodieException("Get avro insert value error for key: " + hoodieRecord.getRecordKey(), (Throwable)e);
                    }
                    if (!curAvroRecord.isPresent()) continue;
                    IndexedRecord avroRecord = (IndexedRecord)curAvroRecord.get();
                    GenericRecord requiredAvroRecord = FormatUtils.buildAvroRecordBySchema(avroRecord, requiredSchema, MergeOnReadInputFormat.this.requiredPos, recordBuilder);
                    this.currentRecord = (RowData)avroToRowDataConverter.convert(requiredAvroRecord);
                    FormatUtils.setRowKind(this.currentRecord, avroRecord, MergeOnReadInputFormat.this.tableState.getOperationPos());
                    return true;
                }
                return false;
            }

            public RowData next() {
                return this.currentRecord;
            }

            public void close() {
                records.close();
            }
        };
    }

    protected ClosableIterator<RowData> getFullLogFileIterator(MergeOnReadInputSplit split) {
        final Schema tableSchema = new Schema.Parser().parse(this.tableState.getAvroSchema());
        final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(this.tableState.getRowType());
        final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, InternalSchema.getEmptyInternalSchema(), this.conf, this.hadoopConf);
        final Iterator logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
        return new ClosableIterator<RowData>(){
            private RowData currentRecord;

            public boolean hasNext() {
                while (logRecordsKeyIterator.hasNext()) {
                    IndexedRecord avroRecord;
                    RowKind rowKind;
                    String curAvroKey = (String)logRecordsKeyIterator.next();
                    Option curAvroRecord = null;
                    HoodieAvroRecord hoodieRecord = (HoodieAvroRecord)scanner.getRecords().get(curAvroKey);
                    try {
                        curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
                    }
                    catch (IOException e) {
                        throw new HoodieException("Get avro insert value error for key: " + curAvroKey, (Throwable)e);
                    }
                    if (!curAvroRecord.isPresent() || (rowKind = FormatUtils.getRowKindSafely(avroRecord = (IndexedRecord)curAvroRecord.get(), MergeOnReadInputFormat.this.tableState.getOperationPos())) == RowKind.DELETE) continue;
                    this.currentRecord = (RowData)avroToRowDataConverter.convert(avroRecord);
                    this.currentRecord.setRowKind(rowKind);
                    return true;
                }
                return false;
            }

            public RowData next() {
                return this.currentRecord;
            }

            public void close() {
                scanner.close();
            }
        };
    }

    private static int[] getRequiredPosWithCommitTime(int[] requiredPos) {
        if (MergeOnReadInputFormat.getCommitTimePos(requiredPos) >= 0) {
            return requiredPos;
        }
        int[] requiredPos2 = new int[requiredPos.length + 1];
        requiredPos2[0] = 0;
        System.arraycopy(requiredPos, 0, requiredPos2, 1, requiredPos.length);
        return requiredPos2;
    }

    private static int getCommitTimePos(int[] requiredPos) {
        for (int i = 0; i < requiredPos.length; ++i) {
            if (requiredPos[i] != 0) continue;
            return i;
        }
        return -1;
    }

    @VisibleForTesting
    public void isEmitDelete(boolean emitDelete) {
        this.emitDelete = emitDelete;
    }

    public static class Builder {
        protected Configuration conf;
        protected MergeOnReadTableState tableState;
        protected List<DataType> fieldTypes;
        protected String defaultPartName;
        protected List<ExpressionPredicates.Predicate> predicates;
        protected long limit = -1L;
        protected boolean emitDelete = false;
        protected InternalSchemaManager internalSchemaManager = InternalSchemaManager.DISABLED;

        public Builder config(Configuration conf) {
            this.conf = conf;
            return this;
        }

        public Builder tableState(MergeOnReadTableState tableState) {
            this.tableState = tableState;
            return this;
        }

        public Builder fieldTypes(List<DataType> fieldTypes) {
            this.fieldTypes = fieldTypes;
            return this;
        }

        public Builder defaultPartName(String defaultPartName) {
            this.defaultPartName = defaultPartName;
            return this;
        }

        public Builder predicates(List<ExpressionPredicates.Predicate> predicates) {
            this.predicates = predicates;
            return this;
        }

        public Builder limit(long limit) {
            this.limit = limit;
            return this;
        }

        public Builder emitDelete(boolean emitDelete) {
            this.emitDelete = emitDelete;
            return this;
        }

        public Builder internalSchemaManager(InternalSchemaManager internalSchemaManager) {
            this.internalSchemaManager = internalSchemaManager;
            return this;
        }

        public MergeOnReadInputFormat build() {
            return new MergeOnReadInputFormat(this.conf, this.tableState, this.fieldTypes, this.defaultPartName, this.predicates, this.limit, this.emitDelete, this.internalSchemaManager);
        }
    }

    protected static class MergeIterator
    implements ClosableIterator<RowData> {
        private final ClosableIterator<RowData> nested;
        private final Iterator<String> logKeysIterator;
        private final HoodieMergedLogRecordScanner scanner;
        private final Schema tableSchema;
        private final boolean emitDelete;
        private final int operationPos;
        private final RowDataToAvroConverters.RowDataToAvroConverter rowDataToAvroConverter;
        private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
        private final Option<RowDataProjection> projection;
        private final Option<Function<IndexedRecord, GenericRecord>> avroProjection;
        private final InstantRange instantRange;
        private final HoodieRecordMerger recordMerger;
        private boolean readLogs = false;
        private final Set<String> keyToSkip = new HashSet<String>();
        private final TypedProperties payloadProps;
        private RowData currentRecord;

        public MergeIterator(Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf, MergeOnReadInputSplit split, RowType tableRowType, RowType requiredRowType, Schema tableSchema, Schema requiredSchema, InternalSchema querySchema, int[] requiredPos, boolean emitDelete, int operationPos, ClosableIterator<RowData> nested) {
            this(flinkConf, hadoopConf, split, tableRowType, requiredRowType, tableSchema, querySchema, (Option<RowDataProjection>)Option.of((Object)RowDataProjection.instance(requiredRowType, requiredPos)), (Option<Function<IndexedRecord, GenericRecord>>)Option.of(record -> FormatUtils.buildAvroRecordBySchema(record, requiredSchema, requiredPos, new GenericRecordBuilder(requiredSchema))), emitDelete, operationPos, nested);
        }

        public MergeIterator(Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf, MergeOnReadInputSplit split, RowType tableRowType, RowType requiredRowType, Schema tableSchema, InternalSchema querySchema, Option<RowDataProjection> projection, Option<Function<IndexedRecord, GenericRecord>> avroProjection, boolean emitDelete, int operationPos, ClosableIterator<RowData> nested) {
            this.tableSchema = tableSchema;
            this.nested = nested;
            this.scanner = FormatUtils.logScanner(split, tableSchema, querySchema, flinkConf, hadoopConf);
            this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps();
            this.logKeysIterator = this.scanner.getRecords().keySet().iterator();
            this.emitDelete = emitDelete;
            this.operationPos = operationPos;
            this.avroProjection = avroProjection;
            this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter((LogicalType)tableRowType);
            this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
            this.projection = projection;
            this.instantRange = (InstantRange)split.getInstantRange().orElse(null);
            List mergers = Arrays.stream(flinkConf.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(",")).map(String::trim).distinct().collect(Collectors.toList());
            this.recordMerger = HoodieRecordUtils.createRecordMerger((String)split.getTablePath(), (EngineType)EngineType.FLINK, mergers, (String)flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY));
        }

        public boolean hasNext() {
            while (!this.readLogs && this.nested.hasNext()) {
                boolean isInRange;
                this.currentRecord = (RowData)this.nested.next();
                if (this.instantRange != null && !(isInRange = this.instantRange.isInRange(this.currentRecord.getString(0).toString()))) continue;
                String curKey = this.currentRecord.getString(2).toString();
                if (this.scanner.getRecords().containsKey(curKey)) {
                    this.keyToSkip.add(curKey);
                    Option<HoodieRecord<IndexedRecord>> mergedAvroRecord = this.mergeRowWithLog(this.currentRecord, curKey);
                    if (!mergedAvroRecord.isPresent()) continue;
                    RowKind rowKind = FormatUtils.getRowKindSafely((IndexedRecord)((HoodieRecord)mergedAvroRecord.get()).getData(), this.operationPos);
                    if (!this.emitDelete && rowKind == RowKind.DELETE) continue;
                    IndexedRecord avroRecord = this.avroProjection.isPresent() ? (IndexedRecord)((Function)this.avroProjection.get()).apply(((HoodieRecord)mergedAvroRecord.get()).getData()) : (IndexedRecord)((HoodieRecord)mergedAvroRecord.get()).getData();
                    this.currentRecord = (RowData)this.avroToRowDataConverter.convert(avroRecord);
                    this.currentRecord.setRowKind(rowKind);
                    return true;
                }
                if (this.projection.isPresent()) {
                    this.currentRecord = ((RowDataProjection)this.projection.get()).project(this.currentRecord);
                }
                return true;
            }
            this.readLogs = true;
            while (this.logKeysIterator.hasNext()) {
                Option<IndexedRecord> insertAvroRecord;
                String curKey = this.logKeysIterator.next();
                if (this.keyToSkip.contains(curKey) || !(insertAvroRecord = this.getInsertValue(curKey)).isPresent()) continue;
                IndexedRecord avroRecord = this.avroProjection.isPresent() ? (IndexedRecord)((Function)this.avroProjection.get()).apply(insertAvroRecord.get()) : (IndexedRecord)insertAvroRecord.get();
                this.currentRecord = (RowData)this.avroToRowDataConverter.convert(avroRecord);
                FormatUtils.setRowKind(this.currentRecord, (IndexedRecord)insertAvroRecord.get(), this.operationPos);
                return true;
            }
            return false;
        }

        private Option<IndexedRecord> getInsertValue(String curKey) {
            HoodieAvroRecord record = (HoodieAvroRecord)this.scanner.getRecords().get(curKey);
            if (!this.emitDelete && HoodieOperation.isDelete((HoodieOperation)record.getOperation())) {
                return Option.empty();
            }
            try {
                return record.getData().getInsertValue(this.tableSchema);
            }
            catch (IOException e) {
                throw new HoodieIOException("Get insert value from payload exception", e);
            }
        }

        public RowData next() {
            return this.currentRecord;
        }

        public void close() {
            if (this.nested != null) {
                this.nested.close();
            }
            if (this.scanner != null) {
                this.scanner.close();
            }
        }

        private Option<HoodieRecord<IndexedRecord>> mergeRowWithLog(RowData curRow, String curKey) {
            HoodieRecord record = (HoodieRecord)this.scanner.getRecords().get(curKey);
            GenericRecord historyAvroRecord = (GenericRecord)this.rowDataToAvroConverter.convert(this.tableSchema, curRow);
            HoodieAvroIndexedRecord hoodieAvroIndexedRecord = new HoodieAvroIndexedRecord((IndexedRecord)historyAvroRecord);
            try {
                return this.recordMerger.merge((HoodieRecord)hoodieAvroIndexedRecord, this.tableSchema, record, this.tableSchema, this.payloadProps).map(Pair::getLeft);
            }
            catch (IOException e) {
                throw new HoodieIOException("Merge base and delta payloads exception", e);
            }
        }
    }

    static class SkipMergeIterator
    implements ClosableIterator<RowData> {
        private final ClosableIterator<RowData> nested;
        private final ClosableIterator<RowData> iterator;
        private boolean readLogs = false;
        private RowData currentRecord;

        SkipMergeIterator(ClosableIterator<RowData> nested, ClosableIterator<RowData> iterator) {
            this.nested = nested;
            this.iterator = iterator;
        }

        public boolean hasNext() {
            if (!this.readLogs && this.nested.hasNext()) {
                this.currentRecord = (RowData)this.nested.next();
                return true;
            }
            this.readLogs = true;
            if (this.iterator.hasNext()) {
                this.currentRecord = (RowData)this.iterator.next();
                return true;
            }
            return false;
        }

        public RowData next() {
            return this.currentRecord;
        }

        public void close() {
            if (this.nested != null) {
                this.nested.close();
            }
            if (this.iterator != null) {
                this.iterator.close();
            }
        }
    }

    protected static class LogFileOnlyIterator
    implements ClosableIterator<RowData> {
        private final ClosableIterator<RowData> iterator;

        public LogFileOnlyIterator(ClosableIterator<RowData> iterator) {
            this.iterator = iterator;
        }

        public boolean hasNext() {
            return this.iterator.hasNext();
        }

        public RowData next() {
            return (RowData)this.iterator.next();
        }

        public void close() {
            if (this.iterator != null) {
                this.iterator.close();
            }
        }
    }

    static class BaseFileOnlyFilteringIterator
    implements ClosableIterator<RowData> {
        private final ClosableIterator<RowData> nested;
        private final InstantRange instantRange;
        private final RowDataProjection projection;
        private RowData currentRecord;
        private int commitTimePos;

        BaseFileOnlyFilteringIterator(InstantRange instantRange, RowType requiredRowType, int[] requiredPos, ClosableIterator<RowData> nested) {
            int[] positions;
            this.nested = nested;
            this.instantRange = instantRange;
            this.commitTimePos = MergeOnReadInputFormat.getCommitTimePos(requiredPos);
            if (this.commitTimePos < 0) {
                this.commitTimePos = 0;
                positions = IntStream.range(1, 1 + requiredPos.length).toArray();
            } else {
                positions = IntStream.range(0, requiredPos.length).toArray();
            }
            this.projection = RowDataProjection.instance(requiredRowType, positions);
        }

        public boolean hasNext() {
            while (this.nested.hasNext()) {
                this.currentRecord = (RowData)this.nested.next();
                boolean isInRange = this.instantRange.isInRange(this.currentRecord.getString(this.commitTimePos).toString());
                if (!isInRange) continue;
                return true;
            }
            return false;
        }

        public RowData next() {
            return this.projection.project(this.currentRecord);
        }

        public void close() {
            if (this.nested != null) {
                this.nested.close();
            }
        }
    }
}

