package org.apache.seatunnel.translation.flink.source;

import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.types.Row;
import org.apache.seatunnel.api.common.metrics.Counter;
import org.apache.seatunnel.api.common.metrics.Meter;
import org.apache.seatunnel.api.common.metrics.MetricNames;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;

/* loaded from: input_file:org/apache/seatunnel/translation/flink/source/FlinkRowCollector.class */
public class FlinkRowCollector implements Collector<SeaTunnelRow> {
    private ReaderOutput<Row> readerOutput;
    private final FlinkRowConverter rowSerialization;
    private final FlowControlGate flowControlGate;
    private final Counter sourceReadCount;
    private final Counter sourceReadBytes;
    private final Meter sourceReadQPS;

    public FlinkRowCollector(SeaTunnelRowType seaTunnelRowType, Config config, MetricsContext metricsContext) {
        this.rowSerialization = new FlinkRowConverter(seaTunnelRowType);
        this.flowControlGate = FlowControlGate.create(FlowControlStrategy.fromConfig(config));
        this.sourceReadCount = metricsContext.counter(MetricNames.SOURCE_RECEIVED_COUNT);
        this.sourceReadBytes = metricsContext.counter(MetricNames.SOURCE_RECEIVED_BYTES);
        this.sourceReadQPS = metricsContext.meter(MetricNames.SOURCE_RECEIVED_QPS);
    }

    @Override // org.apache.seatunnel.api.source.Collector
    public void collect(SeaTunnelRow seaTunnelRow) {
        this.flowControlGate.audit(seaTunnelRow);
        try {
            this.readerOutput.collect(this.rowSerialization.convert(seaTunnelRow));
            this.sourceReadCount.inc();
            this.sourceReadBytes.inc(seaTunnelRow.getBytesSize());
            this.sourceReadQPS.markEvent();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.seatunnel.api.source.Collector
    public Object getCheckpointLock() {
        return this;
    }

    public FlinkRowCollector withReaderOutput(ReaderOutput<Row> readerOutput) {
        this.readerOutput = readerOutput;
        return this;
    }
}
