package org.apache.paimon.flink.sink.cdc;

import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.types.DataField;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.class */
public class CdcParsingProcessFunction<T> extends ProcessFunction<T, CdcRecord> {
    public static final OutputTag<List<DataField>> NEW_DATA_FIELD_LIST_OUTPUT_TAG = new OutputTag<>("new-data-field-list", new ListTypeInfo(DataField.class));
    private final EventParser.Factory<T> parserFactory;
    private transient EventParser<T> parser;

    public CdcParsingProcessFunction(EventParser.Factory<T> factory) {
        this.parserFactory = factory;
    }

    public void open(Configuration configuration) throws Exception {
        this.parser = this.parserFactory.create();
    }

    public void processElement(T t, ProcessFunction<T, CdcRecord>.Context context, Collector<CdcRecord> collector) throws Exception {
        this.parser.setRawEvent(t);
        if (this.parser.isUpdatedDataFields()) {
            this.parser.getUpdatedDataFields().ifPresent(list -> {
                context.output(NEW_DATA_FIELD_LIST_OUTPUT_TAG, list);
            });
            return;
        }
        Iterator<CdcRecord> it = this.parser.getRecords().iterator();
        while (it.hasNext()) {
            collector.collect(it.next());
        }
    }
}
