package org.apache.flink.table.store.file.operation;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.compact.CompactManager;
import org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.compact.CompactUnit;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
import org.apache.flink.table.store.file.data.DataFileWriter;
import org.apache.flink.table.store.file.mergetree.Levels;
import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManager;
import org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactTask;
import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.class */
public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
    private final DataFileReader.Factory dataFileReaderFactory;
    private final DataFileWriter.Factory dataFileWriterFactory;
    private final Supplier<Comparator<RowData>> keyComparatorSupplier;
    private final MergeFunction mergeFunction;
    private final CoreOptions options;

    public KeyValueFileStoreWrite(SchemaManager schemaManager, long j, RowType rowType, RowType rowType2, Supplier<Comparator<RowData>> supplier, MergeFunction mergeFunction, FileStorePathFactory fileStorePathFactory, SnapshotManager snapshotManager, FileStoreScan fileStoreScan, CoreOptions coreOptions) {
        super(snapshotManager, fileStoreScan);
        this.dataFileReaderFactory = new DataFileReader.Factory(schemaManager, j, rowType, rowType2, coreOptions.fileFormat(), fileStorePathFactory);
        this.dataFileWriterFactory = new DataFileWriter.Factory(j, rowType, rowType2, coreOptions.fileFormat(), fileStorePathFactory, coreOptions.targetFileSize());
        this.keyComparatorSupplier = supplier;
        this.mergeFunction = mergeFunction;
        this.options = coreOptions;
    }

    @Override // org.apache.flink.table.store.file.operation.FileStoreWrite
    public RecordWriter<KeyValue> createWriter(BinaryRowData binaryRowData, int i, ExecutorService executorService) {
        return createMergeTreeWriter(binaryRowData, i, scanExistingFileMetas(binaryRowData, i), executorService);
    }

    @Override // org.apache.flink.table.store.file.operation.FileStoreWrite
    public RecordWriter<KeyValue> createEmptyWriter(BinaryRowData binaryRowData, int i, ExecutorService executorService) {
        return createMergeTreeWriter(binaryRowData, i, Collections.emptyList(), executorService);
    }

    @Override // org.apache.flink.table.store.file.operation.FileStoreWrite
    public Callable<CompactResult> createCompactWriter(BinaryRowData binaryRowData, int i, @Nullable List<DataFileMeta> list) {
        if (list == null) {
            list = scanExistingFileMetas(binaryRowData, i);
        }
        Comparator<RowData> comparator = this.keyComparatorSupplier.get();
        CompactRewriter compactRewriter = compactRewriter(binaryRowData, i, comparator);
        Levels levels = new Levels(comparator, list, this.options.numLevels());
        return new MergeTreeCompactTask(comparator, this.options.targetFileSize(), compactRewriter, CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1, levels.levelSortedRuns()), true);
    }

    private MergeTreeWriter createMergeTreeWriter(BinaryRowData binaryRowData, int i, List<DataFileMeta> list, ExecutorService executorService) {
        DataFileWriter create = this.dataFileWriterFactory.create(binaryRowData, i);
        Comparator<RowData> comparator = this.keyComparatorSupplier.get();
        Levels levels = new Levels(comparator, list, this.options.numLevels());
        return new MergeTreeWriter(create.keyType(), create.valueType(), createCompactManager(binaryRowData, i, new UniversalCompaction(this.options.maxSizeAmplificationPercent(), this.options.sortedRunSizeRatio(), this.options.numSortedRunCompactionTrigger(), this.options.maxSortedRunNum()), executorService, levels), levels, getMaxSequenceNumber(list), comparator, this.mergeFunction.copy(), create, this.options.commitForceCompact(), this.options.numSortedRunStopTrigger(), this.options.changelogProducer());
    }

    private CompactManager createCompactManager(BinaryRowData binaryRowData, int i, CompactStrategy compactStrategy, ExecutorService executorService, Levels levels) {
        Comparator<RowData> comparator = this.keyComparatorSupplier.get();
        return new MergeTreeCompactManager(executorService, levels, compactStrategy, comparator, this.options.targetFileSize(), compactRewriter(binaryRowData, i, comparator));
    }

    private CompactRewriter compactRewriter(BinaryRowData binaryRowData, int i, Comparator<RowData> comparator) {
        DataFileWriter create = this.dataFileWriterFactory.create(binaryRowData, i);
        return (i2, z, list) -> {
            return create.write(new RecordReaderIterator(new MergeTreeReader(list, z, this.dataFileReaderFactory.create(binaryRowData, i), comparator, this.mergeFunction.copy())), i2);
        };
    }
}
