package org.apache.paimon.flink.sink.cdc;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.DynamicBucketRow;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.class */
public class CdcDynamicBucketWriteOperator extends TableWriteOperator<Tuple2<CdcRecord, Integer>> {
    private static final long serialVersionUID = 1;
    private final long retrySleepMillis;

    public CdcDynamicBucketWriteOperator(FileStoreTable fileStoreTable, StoreSinkWrite.Provider provider, String str) {
        super(fileStoreTable, provider, str);
        this.retrySleepMillis = ((Duration) fileStoreTable.coreOptions().toConfiguration().get(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME)).toMillis();
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        this.table = this.table.copyWithLatestSchema();
        super.initializeState(stateInitializationContext);
    }

    protected boolean containLogSystem() {
        return false;
    }

    public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> streamRecord) throws Exception {
        Tuple2 tuple2 = (Tuple2) streamRecord.getValue();
        Optional<GenericRow> genericRow = CdcRecordUtils.toGenericRow((CdcRecord) tuple2.f0, this.table.schema().fields());
        if (!genericRow.isPresent()) {
            while (true) {
                this.table = this.table.copyWithLatestSchema();
                genericRow = CdcRecordUtils.toGenericRow((CdcRecord) tuple2.f0, this.table.schema().fields());
                if (genericRow.isPresent()) {
                    break;
                } else {
                    Thread.sleep(this.retrySleepMillis);
                }
            }
            this.write.replace(this.table);
        }
        try {
            this.write.write(new DynamicBucketRow(genericRow.get(), ((Integer) tuple2.f1).intValue()));
        } catch (Exception e) {
            throw new IOException(e);
        }
    }
}
