package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
import org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.class */
public class KafkaRecordEmitter implements RecordEmitter<ConsumerRecord<byte[], byte[]>, SeaTunnelRow, KafkaSourceSplitState> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaRecordEmitter.class);
    private final Map<TablePath, ConsumerMetadata> mapMetadata;
    private final OutputCollector<SeaTunnelRow> outputCollector = new OutputCollector<>();
    private final MessageFormatErrorHandleWay messageFormatErrorHandleWay;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter$OutputCollector.class */
    public static class OutputCollector<T> implements Collector<T> {
        private Collector<T> output;

        private OutputCollector() {
        }

        public void collect(T t) {
            this.output.collect(t);
        }

        public void collect(SchemaChangeEvent schemaChangeEvent) {
            this.output.collect(schemaChangeEvent);
        }

        public void markSchemaChangeBeforeCheckpoint() {
            this.output.markSchemaChangeBeforeCheckpoint();
        }

        public void markSchemaChangeAfterCheckpoint() {
            this.output.markSchemaChangeAfterCheckpoint();
        }

        public Object getCheckpointLock() {
            return this.output.getCheckpointLock();
        }
    }

    public KafkaRecordEmitter(Map<TablePath, ConsumerMetadata> map, MessageFormatErrorHandleWay messageFormatErrorHandleWay) {
        this.mapMetadata = map;
        this.messageFormatErrorHandleWay = messageFormatErrorHandleWay;
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter
    public void emitRecord(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<SeaTunnelRow> collector, KafkaSourceSplitState kafkaSourceSplitState) throws Exception {
        ((OutputCollector) this.outputCollector).output = collector;
        DeserializationSchema<SeaTunnelRow> deserializationSchema = this.mapMetadata.get(kafkaSourceSplitState.getTablePath()).getDeserializationSchema();
        try {
            if (deserializationSchema instanceof CompatibleKafkaConnectDeserializationSchema) {
                ((CompatibleKafkaConnectDeserializationSchema) deserializationSchema).deserialize(consumerRecord, this.outputCollector);
            } else {
                deserializationSchema.deserialize(consumerRecord.value(), this.outputCollector);
            }
        } catch (Exception e) {
            if (this.messageFormatErrorHandleWay != MessageFormatErrorHandleWay.SKIP) {
                throw e;
            }
            logger.warn("Deserialize message failed, skip this message, message: {}", new String(consumerRecord.value()));
        }
        kafkaSourceSplitState.setCurrentOffset(consumerRecord.offset() + 1);
    }
}
