package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.List;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializationUtils;

/* loaded from: input_file:org/apache/paimon/flink/sink/StoreCompactOperator.class */
public class StoreCompactOperator extends PrepareCommitOperator<RowData> {
    private final FileStoreTable table;
    private final StoreSinkWrite.Provider storeSinkWriteProvider;
    private final boolean isStreaming;
    private final String initialCommitUser;
    private transient StoreSinkWriteState state;
    private transient StoreSinkWrite write;
    private transient DataFileMetaSerializer dataFileMetaSerializer;

    public StoreCompactOperator(FileStoreTable fileStoreTable, StoreSinkWrite.Provider provider, boolean z, String str) {
        Preconditions.checkArgument(!fileStoreTable.coreOptions().writeOnly(), CoreOptions.WRITE_ONLY.key() + " should not be true for StoreCompactOperator.");
        this.table = fileStoreTable;
        this.storeSinkWriteProvider = provider;
        this.isStreaming = z;
        this.initialCommitUser = str;
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        String str = (String) StateUtils.getSingleValueFromState(stateInitializationContext, "commit_user_state", String.class, this.initialCommitUser);
        this.state = new StoreSinkWriteState(stateInitializationContext, (str2, binaryRow, i) -> {
            return ChannelComputer.select(binaryRow, i, getRuntimeContext().getNumberOfParallelSubtasks()) == getRuntimeContext().getIndexOfThisSubtask();
        });
        this.write = this.storeSinkWriteProvider.provide(this.table, str, this.state, getContainingTask().getEnvironment().getIOManager());
    }

    public void open() throws Exception {
        super.open();
        this.dataFileMetaSerializer = new DataFileMetaSerializer();
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        long j = rowData.getLong(0);
        BinaryRow deserializeBinaryRow = SerializationUtils.deserializeBinaryRow(rowData.getBinary(1));
        int i = rowData.getInt(2);
        List<DataFileMeta> deserializeList = this.dataFileMetaSerializer.deserializeList(rowData.getBinary(3));
        if (this.isStreaming) {
            this.write.notifyNewFiles(j, deserializeBinaryRow, i, deserializeList);
            this.write.compact(deserializeBinaryRow, i, false);
        } else {
            Preconditions.checkArgument(deserializeList.isEmpty(), "Batch compact job does not concern what files are compacted. They only need to know what buckets are compacted.");
            this.write.compact(deserializeBinaryRow, i, true);
        }
    }

    @Override // org.apache.paimon.flink.sink.PrepareCommitOperator
    protected List<Committable> prepareCommit(boolean z, long j) throws IOException {
        return this.write.prepareCommit(z, j);
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.write.snapshotState();
        this.state.snapshotState();
    }

    public void close() throws Exception {
        super.close();
        this.write.close();
    }
}
