package org.apache.paimon.flink.sink;

import java.util.List;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.mergetree.SortBufferWriteBuffer;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.PrimaryKeyTableUtils;
import org.apache.paimon.table.sink.RowKindGenerator;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.UserDefinedSeqComparator;

/* loaded from: input_file:org/apache/paimon/flink/sink/LocalMergeOperator.class */
public class LocalMergeOperator extends AbstractStreamOperator<InternalRow> implements OneInputStreamOperator<InternalRow, InternalRow>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    private final TableSchema schema;
    private final boolean ignoreDelete;
    private transient Projection keyProjection;
    private transient RecordComparator keyComparator;
    private transient long recordCount;
    private transient RowKindGenerator rowKindGenerator;
    private transient MergeFunction<KeyValue> mergeFunction;
    private transient SortBufferWriteBuffer buffer;
    private transient long currentWatermark;
    private transient boolean endOfInput;

    public LocalMergeOperator(TableSchema tableSchema) {
        Preconditions.checkArgument(tableSchema.primaryKeys().size() > 0, "LocalMergeOperator currently only support tables with primary keys");
        this.schema = tableSchema;
        this.ignoreDelete = CoreOptions.fromMap(tableSchema.options()).ignoreDelete();
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void open() throws Exception {
        super.open();
        RowType addKeyNamePrefix = PrimaryKeyTableUtils.addKeyNamePrefix(this.schema.logicalPrimaryKeysType());
        RowType logicalRowType = this.schema.logicalRowType();
        CoreOptions coreOptions = new CoreOptions(this.schema.options());
        this.keyProjection = CodeGenUtils.newProjection(logicalRowType, this.schema.projection(this.schema.primaryKeys()));
        this.keyComparator = new KeyComparatorSupplier(addKeyNamePrefix).get();
        this.recordCount = 0L;
        this.rowKindGenerator = RowKindGenerator.create(this.schema, coreOptions);
        this.mergeFunction = PrimaryKeyTableUtils.createMergeFunctionFactory(this.schema, new KeyValueFieldsExtractor() { // from class: org.apache.paimon.flink.sink.LocalMergeOperator.1
            private static final long serialVersionUID = 1;

            @Override // org.apache.paimon.schema.KeyValueFieldsExtractor
            public List<DataField> keyFields(TableSchema tableSchema) {
                return tableSchema.primaryKeysFields();
            }

            @Override // org.apache.paimon.schema.KeyValueFieldsExtractor
            public List<DataField> valueFields(TableSchema tableSchema) {
                return tableSchema.fields();
            }
        }).create();
        this.buffer = new SortBufferWriteBuffer(addKeyNamePrefix, logicalRowType, UserDefinedSeqComparator.create(logicalRowType, coreOptions), new HeapMemorySegmentPool(coreOptions.localMergeBufferSize(), coreOptions.pageSize()), false, MemorySize.MAX_VALUE, coreOptions.localSortMaxNumFileHandles(), coreOptions.spillCompression(), null);
        this.currentWatermark = Long.MIN_VALUE;
        this.endOfInput = false;
    }

    public void processElement(StreamRecord<InternalRow> streamRecord) throws Exception {
        this.recordCount++;
        InternalRow internalRow = (InternalRow) streamRecord.getValue();
        RowKind rowKind = RowKindGenerator.getRowKind(this.rowKindGenerator, internalRow);
        if (this.ignoreDelete && rowKind.isRetract()) {
            return;
        }
        internalRow.setRowKind(RowKind.INSERT);
        BinaryRow apply = this.keyProjection.apply(internalRow);
        if (this.buffer.put(this.recordCount, rowKind, apply, internalRow)) {
            return;
        }
        flushBuffer();
        if (this.buffer.put(this.recordCount, rowKind, apply, internalRow)) {
            return;
        }
        internalRow.setRowKind(rowKind);
        this.output.collect(streamRecord);
    }

    public void processWatermark(Watermark watermark) {
        this.currentWatermark = watermark.getTimestamp();
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        if (this.endOfInput) {
            return;
        }
        flushBuffer();
    }

    public void endInput() throws Exception {
        this.endOfInput = true;
        flushBuffer();
    }

    public void close() throws Exception {
        if (this.buffer != null) {
            this.buffer.clear();
        }
        super.close();
    }

    private void flushBuffer() throws Exception {
        if (this.buffer.size() == 0) {
            return;
        }
        this.buffer.forEach(this.keyComparator, this.mergeFunction, null, keyValue -> {
            InternalRow value = keyValue.value();
            value.setRowKind(keyValue.valueKind());
            this.output.collect(new StreamRecord(value));
        });
        this.buffer.clear();
        if (this.currentWatermark != Long.MIN_VALUE) {
            super.processWatermark(new Watermark(this.currentWatermark));
            this.currentWatermark = Long.MIN_VALUE;
        }
    }
}
