package org.apache.druid.data.input.kafkainput;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;

/* loaded from: input_file:org/apache/druid/data/input/kafkainput/KafkaInputReader.class */
public class KafkaInputReader implements InputEntityReader {
    private static final Logger log = new Logger(KafkaInputReader.class);
    private final InputRowSchema inputRowSchema;
    private final SettableByteEntity<KafkaRecordEntity> source;
    private final Function<KafkaRecordEntity, KafkaHeaderReader> headerParserSupplier;
    private final Function<KafkaRecordEntity, InputEntityReader> keyParserSupplier;
    private final InputEntityReader valueParser;
    private final String keyColumnName;
    private final String timestampColumnName;

    public KafkaInputReader(InputRowSchema inputRowSchema, SettableByteEntity<KafkaRecordEntity> settableByteEntity, @Nullable Function<KafkaRecordEntity, KafkaHeaderReader> function, @Nullable Function<KafkaRecordEntity, InputEntityReader> function2, InputEntityReader inputEntityReader, String str, String str2) {
        this.inputRowSchema = inputRowSchema;
        this.source = settableByteEntity;
        this.headerParserSupplier = function;
        this.keyParserSupplier = function2;
        this.valueParser = inputEntityReader;
        this.keyColumnName = str;
        this.timestampColumnName = str2;
    }

    private List<String> getFinalDimensionList(HashSet<String> hashSet) {
        List<String> dimensionNames = this.inputRowSchema.getDimensionsSpec().getDimensionNames();
        return !dimensionNames.isEmpty() ? dimensionNames : Lists.newArrayList(Sets.difference(hashSet, this.inputRowSchema.getDimensionsSpec().getDimensionExclusions()));
    }

    private CloseableIterator<InputRow> buildBlendedRows(InputEntityReader inputEntityReader, Map<String, Object> map) throws IOException {
        return inputEntityReader.read().map(inputRow -> {
            try {
                MapBasedInputRow mapBasedInputRow = (MapBasedInputRow) inputRow;
                HashMap hashMap = new HashMap(map);
                hashMap.putAll(mapBasedInputRow.getEvent());
                HashSet<String> hashSet = new HashSet<>(mapBasedInputRow.getDimensions());
                hashSet.addAll(map.keySet());
                hashSet.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
                return new MapBasedInputRow(this.inputRowSchema.getTimestampSpec().extractTimestamp(hashMap), getFinalDimensionList(hashSet), hashMap);
            } catch (ClassCastException e) {
                throw new ParseException((String) null, "Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows", new Object[0]);
            }
        });
    }

    private CloseableIterator<InputRow> buildRowsWithoutValuePayload(Map<String, Object> map) {
        return CloseableIterators.withEmptyBaggage(Collections.singletonList(new MapBasedInputRow(this.inputRowSchema.getTimestampSpec().extractTimestamp(map), getFinalDimensionList(new HashSet<>(map.keySet())), map)).iterator());
    }

    public CloseableIterator<InputRow> read() throws IOException {
        KafkaRecordEntity kafkaRecordEntity = (KafkaRecordEntity) this.source.getEntity();
        HashMap hashMap = new HashMap();
        if (this.headerParserSupplier != null) {
            for (Pair<String, Object> pair : this.headerParserSupplier.apply(kafkaRecordEntity).read()) {
                hashMap.put(pair.lhs, pair.rhs);
            }
        }
        hashMap.putIfAbsent(this.timestampColumnName, Long.valueOf(kafkaRecordEntity.getRecord().timestamp()));
        InputEntityReader apply = this.keyParserSupplier == null ? null : this.keyParserSupplier.apply(kafkaRecordEntity);
        if (apply != null) {
            try {
                CloseableIterator read = apply.read();
                Throwable th = null;
                try {
                    try {
                        if (read.hasNext()) {
                            hashMap.putIfAbsent(this.keyColumnName, ((Map.Entry) ((MapBasedInputRow) read.next()).getEvent().entrySet().stream().findFirst().get()).getValue());
                        }
                        if (read != null) {
                            if (0 != 0) {
                                try {
                                    read.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                read.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (ClassCastException e) {
                throw new IOException("Unsupported input format in keyFormat. KafkaInputformat only supports input format that return MapBasedInputRow rows");
            }
        }
        return kafkaRecordEntity.getRecord().value() != null ? buildBlendedRows(this.valueParser, hashMap) : buildRowsWithoutValuePayload(hashMap);
    }

    public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException {
        return read().map(inputRow -> {
            return InputRowListPlusRawValues.of(inputRow, ((MapBasedInputRow) inputRow).getEvent());
        });
    }
}
