package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.memory.FlinkMemorySegmentPool;
import org.apache.paimon.flink.memory.MemorySegmentAllocator;
import org.apache.paimon.flink.utils.ManagedMemoryUtils;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.sink.BatchWriteBuilder;

/* loaded from: input_file:org/apache/paimon/flink/sink/PrepareCommitOperator.class */
public abstract class PrepareCommitOperator<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {

    @Nullable
    protected transient MemorySegmentPool memoryPool;

    @Nullable
    private transient MemorySegmentAllocator memoryAllocator;
    private final Options options;
    private boolean endOfInput = false;

    public PrepareCommitOperator(Options options) {
        this.options = options;
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, streamConfig, output);
        if (((Boolean) this.options.get(FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY)).booleanValue()) {
            MemoryManager memoryManager = streamTask.getEnvironment().getMemoryManager();
            this.memoryAllocator = new MemorySegmentAllocator(streamTask, memoryManager);
            this.memoryPool = new FlinkMemorySegmentPool(ManagedMemoryUtils.computeManagedMemory(this), memoryManager.getPageSize(), this.memoryAllocator);
        }
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        if (this.endOfInput) {
            return;
        }
        emitCommittables(false, j);
    }

    public void endInput() throws Exception {
        this.endOfInput = true;
        emitCommittables(true, BatchWriteBuilder.COMMIT_IDENTIFIER);
    }

    public void close() throws Exception {
        super.close();
        if (this.memoryAllocator != null) {
            this.memoryAllocator.release();
        }
    }

    private void emitCommittables(boolean z, long j) throws IOException {
        prepareCommit(z, j).forEach(obj -> {
            this.output.collect(new StreamRecord(obj));
        });
    }

    protected abstract List<OUT> prepareCommit(boolean z, long j) throws IOException;
}
