package org.apache.paimon.flink.sink.cdc;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StateUtils;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.table.FileStoreTable;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.class */
public class CdcRecordStoreWriteOperator extends PrepareCommitOperator<CdcRecord> {
    private static final long serialVersionUID = 1;
    static final ConfigOption<Duration> RETRY_SLEEP_TIME = ConfigOptions.key("cdc.retry-sleep-time").durationType().defaultValue(Duration.ofMillis(500));
    private FileStoreTable table;
    private final StoreSinkWrite.Provider storeSinkWriteProvider;
    private final String initialCommitUser;
    private final long retrySleepMillis;
    private transient StoreSinkWriteState state;
    private transient StoreSinkWrite write;

    public CdcRecordStoreWriteOperator(FileStoreTable fileStoreTable, StoreSinkWrite.Provider provider, String str) {
        this.table = fileStoreTable;
        this.storeSinkWriteProvider = provider;
        this.initialCommitUser = str;
        this.retrySleepMillis = ((Duration) fileStoreTable.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME)).toMillis();
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        String str = (String) StateUtils.getSingleValueFromState(stateInitializationContext, "commit_user_state", String.class, this.initialCommitUser);
        CdcRecordChannelComputer cdcRecordChannelComputer = new CdcRecordChannelComputer(this.table.schema());
        cdcRecordChannelComputer.setup(getRuntimeContext().getNumberOfParallelSubtasks());
        this.state = new StoreSinkWriteState(stateInitializationContext, (str2, binaryRow, i) -> {
            return cdcRecordChannelComputer.channel(binaryRow, i) == getRuntimeContext().getIndexOfThisSubtask();
        });
        this.table = this.table.copyWithLatestSchema();
        this.write = this.storeSinkWriteProvider.provide(this.table, str, this.state, getContainingTask().getEnvironment().getIOManager());
    }

    public void processElement(StreamRecord<CdcRecord> streamRecord) throws Exception {
        CdcRecord cdcRecord = (CdcRecord) streamRecord.getValue();
        Optional<GenericRow> genericRow = cdcRecord.toGenericRow(this.table.schema().fields());
        if (!genericRow.isPresent()) {
            while (true) {
                this.table = this.table.copyWithLatestSchema();
                genericRow = cdcRecord.toGenericRow(this.table.schema().fields());
                if (genericRow.isPresent()) {
                    break;
                } else {
                    Thread.sleep(this.retrySleepMillis);
                }
            }
            this.write.replace(this.table);
        }
        try {
            this.write.write(genericRow.get());
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.write.snapshotState();
        this.state.snapshotState();
    }

    public void close() throws Exception {
        super.close();
        this.write.close();
    }

    @Override // org.apache.paimon.flink.sink.PrepareCommitOperator
    protected List<Committable> prepareCommit(boolean z, long j) throws IOException {
        return this.write.prepareCommit(z, j);
    }
}
