package org.apache.drill.exec.store.kafka.decoders;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
import java.util.List;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.store.easy.json.JsonProcessor;
import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor;
import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
import org.apache.drill.exec.store.kafka.MetaDataField;
import org.apache.drill.exec.store.kafka.ReadOptions;
import org.apache.drill.exec.vector.complex.fn.JsonReader;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.class */
public class JsonMessageReader implements MessageReader {
    private static final Logger logger = LoggerFactory.getLogger(JsonMessageReader.class);
    private JsonReader jsonReader;
    private VectorContainerWriter writer;
    private ObjectMapper objectMapper;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.drill.exec.store.kafka.decoders.JsonMessageReader$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/drill/exec/store/kafka/decoders/JsonMessageReader$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$drill$exec$store$easy$json$JsonProcessor$ReadState = new int[JsonProcessor.ReadState.values().length];

        static {
            try {
                $SwitchMap$org$apache$drill$exec$store$easy$json$JsonProcessor$ReadState[JsonProcessor.ReadState.WRITE_SUCCEED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$drill$exec$store$easy$json$JsonProcessor$ReadState[JsonProcessor.ReadState.END_OF_STREAM.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$drill$exec$store$easy$json$JsonProcessor$ReadState[JsonProcessor.ReadState.JSON_RECORD_PARSE_ERROR.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$drill$exec$store$easy$json$JsonProcessor$ReadState[JsonProcessor.ReadState.JSON_RECORD_PARSE_EOF_ERROR.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    @Override // org.apache.drill.exec.store.kafka.decoders.MessageReader
    public void init(DrillBuf drillBuf, List<SchemaPath> list, VectorContainerWriter vectorContainerWriter, ReadOptions readOptions) {
        this.jsonReader = new JsonReader.Builder(drillBuf).schemaPathColumns(list).allTextMode(readOptions.isAllTextMode()).readNumbersAsDouble(readOptions.isReadNumbersAsDouble()).enableNanInf(readOptions.isAllowNanInf()).enableEscapeAnyChar(readOptions.isAllowEscapeAnyChar()).build();
        this.jsonReader.setIgnoreJSONParseErrors(readOptions.isSkipInvalidRecords());
        this.writer = vectorContainerWriter;
        this.objectMapper = BaseJsonProcessor.getDefaultMapper().configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, readOptions.isAllowNanInf()).configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, readOptions.isAllowEscapeAnyChar());
    }

    @Override // org.apache.drill.exec.store.kafka.decoders.MessageReader
    public boolean readMessage(ConsumerRecord<?, ?> consumerRecord) {
        String str = new String((byte[]) consumerRecord.value(), Charsets.UTF_8);
        try {
            ObjectNode readTree = this.objectMapper.readTree(str);
            if (readTree == null || !readTree.isObject()) {
                throw new IOException("Unsupported node type: " + ((Object) (readTree == null ? "NO CONTENT" : readTree.getNodeType())));
            }
            ObjectNode objectNode = readTree;
            objectNode.put(MetaDataField.KAFKA_TOPIC.getFieldName(), consumerRecord.topic());
            objectNode.put(MetaDataField.KAFKA_PARTITION_ID.getFieldName(), consumerRecord.partition());
            objectNode.put(MetaDataField.KAFKA_OFFSET.getFieldName(), consumerRecord.offset());
            objectNode.put(MetaDataField.KAFKA_TIMESTAMP.getFieldName(), consumerRecord.timestamp());
            objectNode.put(MetaDataField.KAFKA_MSG_KEY.getFieldName(), consumerRecord.key() != null ? consumerRecord.key().toString() : null);
            this.jsonReader.setSource(readTree);
            return convertJsonReadState(this.jsonReader.write(this.writer));
        } catch (IOException | IllegalArgumentException e) {
            String format = String.format("JSON record %s: %s", str, e.getMessage());
            if (!this.jsonReader.ignoreJSONParseError()) {
                throw UserException.dataReadError(e).message("Failed to read " + format, new Object[0]).addContext("MessageReader", JsonMessageReader.class.getName()).build(logger);
            }
            logger.debug("Skipping {}", format, e);
            return false;
        }
    }

    @Override // org.apache.drill.exec.store.kafka.decoders.MessageReader
    public void ensureAtLeastOneField() {
        this.jsonReader.ensureAtLeastOneField(this.writer);
    }

    @Override // org.apache.drill.exec.store.kafka.decoders.MessageReader
    public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin kafkaStoragePlugin) {
        return new KafkaConsumer<>(kafkaStoragePlugin.m9getConfig().getKafkaConsumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.writer.clear();
        try {
            this.writer.close();
        } catch (Exception e) {
            logger.warn("Error while closing JsonMessageReader: {}", e.getMessage());
        }
    }

    public String toString() {
        return "JsonMessageReader[jsonReader=" + this.jsonReader + "]";
    }

    private boolean convertJsonReadState(JsonProcessor.ReadState readState) {
        switch (AnonymousClass1.$SwitchMap$org$apache$drill$exec$store$easy$json$JsonProcessor$ReadState[readState.ordinal()]) {
            case 1:
            case 2:
                return true;
            case 3:
            case 4:
                return false;
            default:
                throw new IllegalArgumentException("Unexpected JSON read state: " + readState);
        }
    }
}
