package org.apache.flink.table.runtime.operators.source;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkRuntimeException;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/source/InputConversionOperator.class */
public final class InputConversionOperator<E> extends TableStreamOperator<RowData> implements OneInputStreamOperator<E, RowData> {
    private final DynamicTableSource.DataStructureConverter converter;
    private final boolean requiresWrapping;
    private final boolean produceRowtimeMetadata;
    private final boolean propagateWatermark;
    private final boolean isInsertOnly;
    private transient StreamRecord<RowData> outRecord;

    public InputConversionOperator(DynamicTableSource.DataStructureConverter dataStructureConverter, boolean z, boolean z2, boolean z3, boolean z4) {
        this.converter = dataStructureConverter;
        this.requiresWrapping = z;
        this.produceRowtimeMetadata = z2;
        this.propagateWatermark = z3;
        this.isInsertOnly = z4;
    }

    @Override // org.apache.flink.table.runtime.operators.TableStreamOperator
    public void open() throws Exception {
        super.open();
        this.outRecord = new StreamRecord<>((Object) null);
        this.converter.open(RuntimeConverter.Context.create(getUserCodeClassloader()));
    }

    @Override // org.apache.flink.table.runtime.operators.TableStreamOperator
    public void processWatermark(Watermark watermark) throws Exception {
        if (this.propagateWatermark || Watermark.MAX_WATERMARK.equals(watermark)) {
            super.processWatermark(watermark);
        }
    }

    public void processElement(StreamRecord<E> streamRecord) throws Exception {
        RowData rowData;
        Object value = streamRecord.getValue();
        try {
            Object internal = this.converter.toInternal(value);
            if (this.requiresWrapping) {
                GenericRowData genericRowData = new GenericRowData(RowKind.INSERT, 1);
                genericRowData.setField(0, internal);
                rowData = genericRowData;
            } else if (internal == null) {
                return;
            } else {
                rowData = (RowData) internal;
            }
            RowKind rowKind = rowData.getRowKind();
            if (this.isInsertOnly && rowKind != RowKind.INSERT) {
                throw new FlinkRuntimeException(String.format("Error during input conversion. Conversion expects insert-only records but DataStream API record contains: %s", rowKind));
            }
            if (!this.produceRowtimeMetadata) {
                this.output.collect(this.outRecord.replace(rowData));
            } else {
                if (!streamRecord.hasTimestamp()) {
                    throw new FlinkRuntimeException("Could not find timestamp in DataStream API record. Make sure that timestamps have been assigned before and the event-time characteristic is enabled.");
                }
                GenericRowData genericRowData2 = new GenericRowData(1);
                genericRowData2.setField(0, TimestampData.fromEpochMillis(streamRecord.getTimestamp()));
                this.output.collect(this.outRecord.replace(new JoinedRowData(rowKind, rowData, genericRowData2)));
            }
        } catch (Exception e) {
            throw new FlinkRuntimeException(String.format("Error during input conversion from external DataStream API to internal Table API data structures. Make sure that the provided data types that configure the converters are correctly declared in the schema. Affected record:\n%s", value), e);
        }
    }
}
