package org.apache.druid.firehose.kafka;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import javax.annotation.Nullable;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.InvalidMessageException;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.java.util.common.logger.Logger;

/* loaded from: input_file:org/apache/druid/firehose/kafka/KafkaEightFirehoseFactory.class */
public class KafkaEightFirehoseFactory implements FirehoseFactory<InputRowParser<ByteBuffer>> {
    private static final Logger log = new Logger(KafkaEightFirehoseFactory.class);

    @JsonProperty
    private final Properties consumerProps;

    @JsonProperty
    private final String feed;

    @JsonCreator
    public KafkaEightFirehoseFactory(@JsonProperty("consumerProps") Properties properties, @JsonProperty("feed") String str) {
        this.consumerProps = properties;
        this.feed = str;
    }

    public Firehose connect(InputRowParser<ByteBuffer> inputRowParser, File file) {
        final InputRowParser withParseSpec = inputRowParser.withParseSpec(inputRowParser.getParseSpec().withDimensionsSpec(inputRowParser.getParseSpec().getDimensionsSpec().withDimensionExclusions(Sets.union(inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), Sets.newHashSet(new String[]{"feed"})))));
        final ConsumerConnector createJavaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(this.consumerProps));
        List list = (List) createJavaConsumerConnector.createMessageStreams(ImmutableMap.of(this.feed, 1)).get(this.feed);
        if (list == null || list.size() != 1) {
            return null;
        }
        final ConsumerIterator it = ((KafkaStream) list.get(0)).iterator();
        return new Firehose() { // from class: org.apache.druid.firehose.kafka.KafkaEightFirehoseFactory.1
            Iterator<InputRow> nextIterator = Collections.emptyIterator();

            public boolean hasMore() {
                return this.nextIterator.hasNext() || it.hasNext();
            }

            @Nullable
            public InputRow nextRow() {
                try {
                    if (!this.nextIterator.hasNext()) {
                        byte[] bArr = (byte[]) it.next().message();
                        if (bArr == null) {
                            return null;
                        }
                        this.nextIterator = withParseSpec.parseBatch(ByteBuffer.wrap(bArr)).iterator();
                    }
                    return this.nextIterator.next();
                } catch (InvalidMessageException e) {
                    KafkaEightFirehoseFactory.log.error(e, "Message failed its checksum and it is corrupt, will skip it", new Object[0]);
                    return null;
                }
            }

            public Runnable commit() {
                return new Runnable() { // from class: org.apache.druid.firehose.kafka.KafkaEightFirehoseFactory.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        KafkaEightFirehoseFactory.log.info("committing offsets", new Object[0]);
                        createJavaConsumerConnector.commitOffsets();
                    }
                };
            }

            public void close() {
                createJavaConsumerConnector.shutdown();
            }
        };
    }
}
