package org.apache.flink.table.store.connector.source;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
import org.apache.flink.table.store.file.memory.MemoryOwner;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
import org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.class */
public class TestChangelogDataReadWrite {
    private static final RowType KEY_TYPE = new RowType(Collections.singletonList(new RowType.RowField("k", new BigIntType())));
    private static final RowType VALUE_TYPE = new RowType(Collections.singletonList(new RowType.RowField("v", new BigIntType())));
    private static final Comparator<RowData> COMPARATOR = Comparator.comparingLong(rowData -> {
        return rowData.getLong(0);
    });
    private final FileFormat avro = FileFormat.fromIdentifier("avro", new Configuration());
    private final Path tablePath;
    private final FileStorePathFactory pathFactory;
    private final SnapshotManager snapshotManager;
    private final ExecutorService service;

    public TestChangelogDataReadWrite(String str, ExecutorService executorService) {
        this.tablePath = new Path(str);
        this.pathFactory = new FileStorePathFactory(this.tablePath, RowType.of(new LogicalType[]{new IntType()}), "default", (String) CoreOptions.FILE_FORMAT.defaultValue());
        this.snapshotManager = new SnapshotManager(new Path(str));
        this.service = executorService;
    }

    public TableRead createReadWithKey() {
        return createRead(ValueContentRowDataRecordIterator::new);
    }

    public TableRead createReadWithValueCount() {
        return createRead(ValueCountRowDataRecordIterator::new);
    }

    private TableRead createRead(final Function<RecordReader.RecordIterator<KeyValue>, RecordReader.RecordIterator<RowData>> function) {
        return new KeyValueTableRead(new KeyValueFileStoreRead(new SchemaManager(this.tablePath), 0L, KEY_TYPE, VALUE_TYPE, COMPARATOR, new DeduplicateMergeFunction(), this.avro, this.pathFactory)) { // from class: org.apache.flink.table.store.connector.source.TestChangelogDataReadWrite.1
            public TableRead withFilter(Predicate predicate) {
                throw new UnsupportedOperationException();
            }

            public TableRead withProjection(int[][] iArr) {
                throw new UnsupportedOperationException();
            }

            protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(RecordReader.RecordIterator<KeyValue> recordIterator) {
                return (RecordReader.RecordIterator) function.apply(recordIterator);
            }
        };
    }

    public List<DataFileMeta> writeFiles(BinaryRowData binaryRowData, int i, List<Tuple2<Long, Long>> list) throws Exception {
        Preconditions.checkNotNull(this.service, "ExecutorService must be provided if writeFiles is needed");
        RecordWriter<KeyValue> createMergeTreeWriter = createMergeTreeWriter(binaryRowData, i);
        for (Tuple2<Long, Long> tuple2 : list) {
            createMergeTreeWriter.write(new KeyValue().replace(GenericRowData.of(new Object[]{tuple2.f0}), RowKind.INSERT, GenericRowData.of(new Object[]{tuple2.f1})));
        }
        List newFiles = createMergeTreeWriter.prepareCommit(true).newFiles();
        createMergeTreeWriter.close();
        return new ArrayList(newFiles);
    }

    public RecordWriter<KeyValue> createMergeTreeWriter(BinaryRowData binaryRowData, int i) {
        CoreOptions coreOptions = new CoreOptions(Collections.singletonMap(CoreOptions.FILE_FORMAT.key(), "avro"));
        MemoryOwner createEmptyWriter = new KeyValueFileStoreWrite(new SchemaManager(this.tablePath), 0L, KEY_TYPE, VALUE_TYPE, () -> {
            return COMPARATOR;
        }, new DeduplicateMergeFunction(), this.pathFactory, this.snapshotManager, (FileStoreScan) null, coreOptions).createEmptyWriter(binaryRowData, i, this.service);
        createEmptyWriter.setMemoryPool(new HeapMemorySegmentPool(coreOptions.writeBufferSize(), coreOptions.pageSize()));
        return createEmptyWriter;
    }
}
