package org.apache.drill.exec.store.kafka.decoders;

import java.io.ByteArrayInputStream;
import java.util.Properties;
import java.util.StringJoiner;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
import org.apache.drill.exec.store.kafka.MetaDataField;
import org.apache.drill.exec.store.kafka.ReadOptions;
import org.apache.drill.exec.store.kafka.decoders.KafkaJsonLoader;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.class */
public class JsonMessageReader implements MessageReader {
    private static final Logger logger = LoggerFactory.getLogger(JsonMessageReader.class);
    private final ClosingStreamIterator stream = new ClosingStreamIterator();
    private KafkaJsonLoader kafkaJsonLoader;
    private ResultSetLoader resultSetLoader;
    private SchemaNegotiator negotiator;
    private ReadOptions readOptions;
    private Properties kafkaConsumerProps;

    @Override // org.apache.drill.exec.store.kafka.decoders.MessageReader
    public void init(SchemaNegotiator schemaNegotiator, ReadOptions readOptions, KafkaStoragePlugin kafkaStoragePlugin) {
        this.negotiator = schemaNegotiator;
        this.resultSetLoader = schemaNegotiator.build();
        this.readOptions = readOptions;
        this.kafkaConsumerProps = kafkaStoragePlugin.m8getConfig().getKafkaConsumerProps();
    }

    @Override // org.apache.drill.exec.store.kafka.decoders.MessageReader
    public void readMessage(ConsumerRecord<?, ?> consumerRecord) {
        try {
            parseAndWrite(consumerRecord, (byte[]) consumerRecord.value());
        } catch (TokenIterator.RecoverableJsonException e) {
            if (!this.readOptions.isSkipInvalidRecords()) {
                throw UserException.dataReadError(e).message(String.format("Error happened when parsing invalid record. Please set `%s` option to 'true' to skip invalid records.", "store.kafka.reader.skip_invalid_records"), new Object[0]).addContext(this.resultSetLoader.errorContext()).build(logger);
            }
        }
    }

    private void parseAndWrite(ConsumerRecord<?, ?> consumerRecord, byte[] bArr) {
        this.stream.setValue(new ByteArrayInputStream(bArr));
        if (this.kafkaJsonLoader == null) {
            JsonLoaderOptions jsonLoaderOptions = new JsonLoaderOptions();
            jsonLoaderOptions.allTextMode = this.readOptions.isAllTextMode();
            jsonLoaderOptions.readNumbersAsDouble = this.readOptions.isReadNumbersAsDouble();
            jsonLoaderOptions.skipMalformedRecords = this.readOptions.isSkipInvalidRecords();
            jsonLoaderOptions.allowNanInf = this.readOptions.isAllowNanInf();
            jsonLoaderOptions.enableEscapeAnyChar = this.readOptions.isAllowEscapeAnyChar();
            jsonLoaderOptions.skipMalformedDocument = this.readOptions.isSkipInvalidRecords();
            this.kafkaJsonLoader = new KafkaJsonLoader.KafkaJsonLoaderBuilder().resultSetLoader(this.resultSetLoader).standardOptions(this.negotiator.queryOptions()).options(jsonLoaderOptions).errorContext(this.negotiator.parentErrorContext()).fromStream(() -> {
                return this.stream;
            }).build();
        }
        RowSetLoader writer = this.resultSetLoader.writer();
        writer.start();
        if (this.kafkaJsonLoader.parser().next()) {
            writeValue(writer, MetaDataField.KAFKA_TOPIC, consumerRecord.topic());
            writeValue(writer, MetaDataField.KAFKA_PARTITION_ID, Integer.valueOf(consumerRecord.partition()));
            writeValue(writer, MetaDataField.KAFKA_OFFSET, Long.valueOf(consumerRecord.offset()));
            writeValue(writer, MetaDataField.KAFKA_TIMESTAMP, Long.valueOf(consumerRecord.timestamp()));
            writeValue(writer, MetaDataField.KAFKA_MSG_KEY, consumerRecord.key() != null ? consumerRecord.key().toString() : null);
            writer.save();
        }
    }

    private <T> void writeValue(RowSetLoader rowSetLoader, MetaDataField metaDataField, T t) {
        if (rowSetLoader.tupleSchema().column(metaDataField.getFieldName()) == null) {
            rowSetLoader.addColumn(MetadataUtils.newScalar(metaDataField.getFieldName(), metaDataField.getFieldType(), TypeProtos.DataMode.OPTIONAL));
        }
        rowSetLoader.column(metaDataField.getFieldName()).setObject(t);
    }

    @Override // org.apache.drill.exec.store.kafka.decoders.MessageReader
    public ResultSetLoader getResultSetLoader() {
        return this.resultSetLoader;
    }

    @Override // org.apache.drill.exec.store.kafka.decoders.MessageReader
    public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin kafkaStoragePlugin) {
        return new KafkaConsumer<>(kafkaStoragePlugin.m8getConfig().getKafkaConsumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }

    @Override // org.apache.drill.exec.store.kafka.decoders.MessageReader
    public boolean endBatch() {
        this.kafkaJsonLoader.endBatch();
        return this.resultSetLoader.hasRows();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.kafkaJsonLoader.close();
            this.resultSetLoader.close();
        } catch (Exception e) {
            logger.warn("Error while closing JsonMessageReader: {}", e.getMessage());
        }
    }

    public String toString() {
        return new StringJoiner(", ", JsonMessageReader.class.getSimpleName() + "[", "]").add("kafkaJsonLoader=" + this.kafkaJsonLoader).add("resultSetLoader=" + this.resultSetLoader).toString();
    }
}
