package org.apache.tika.pipes.pipesiterator.kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.HandlerConfig;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.pipes.pipesiterator.PipesIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.class */
public class KafkaPipesIterator extends PipesIterator implements Initializable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaPipesIterator.class);
    String topic;
    String bootstrapServers;
    String keySerializer;
    String valueSerializer;
    String groupId;
    String autoOffsetReset = "earliest";
    int pollDelayMs = 100;
    int emitMax = -1;
    int groupInitialRebalanceDelayMs = 3000;
    private Properties props;
    private KafkaConsumer<String, String> consumer;

    @Field
    public void setTopic(String str) {
        this.topic = str;
    }

    @Field
    public void setGroupId(String str) {
        this.groupId = str;
    }

    @Field
    public void setBootstrapServers(String str) {
        this.bootstrapServers = str;
    }

    @Field
    public void setKeySerializer(String str) {
        this.keySerializer = str;
    }

    @Field
    public void setAutoOffsetReset(String str) {
        this.autoOffsetReset = str;
    }

    @Field
    public void setValueSerializer(String str) {
        this.valueSerializer = str;
    }

    @Field
    public void setPollDelayMs(int i) {
        this.pollDelayMs = i;
    }

    @Field
    public void setGroupInitialRebalanceDelayMs(int i) {
        this.groupInitialRebalanceDelayMs = i;
    }

    @Field
    public void setEmitMax(int i) {
        this.emitMax = i;
    }

    private void safePut(Properties properties, String str, Object obj) {
        if (obj != null) {
            properties.put(str, obj);
        }
    }

    public void initialize(Map<String, Param> map) {
        this.props = new Properties();
        safePut(this.props, "bootstrap.servers", this.bootstrapServers);
        safePut(this.props, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, serializerClass(this.keySerializer, StringDeserializer.class));
        safePut(this.props, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serializerClass(this.valueSerializer, StringDeserializer.class));
        safePut(this.props, "group.id", this.groupId);
        safePut(this.props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);
        safePut(this.props, "group.inital.rebalance.delay.ms", Integer.valueOf(this.groupInitialRebalanceDelayMs));
        this.consumer = new KafkaConsumer<>(this.props);
        this.consumer.subscribe(Arrays.asList(this.topic));
    }

    private Object serializerClass(String str, Class cls) {
        if (str == null) {
            return cls;
        }
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            LOGGER.error("Could not find key serializer class: {}", str);
            return null;
        }
    }

    public void checkInitialization(InitializableProblemHandler initializableProblemHandler) throws TikaConfigException {
        super.checkInitialization(initializableProblemHandler);
        TikaConfig.mustNotBeEmpty("bootstrapServers", this.bootstrapServers);
        TikaConfig.mustNotBeEmpty("topic", this.topic);
    }

    protected void enqueue() throws InterruptedException, TimeoutException {
        ConsumerRecords<String, String> poll;
        String fetcherName = getFetcherName();
        String emitterName = getEmitterName();
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        HandlerConfig handlerConfig = getHandlerConfig();
        do {
            poll = this.consumer.poll(Duration.ofMillis(this.pollDelayMs));
            Iterator<ConsumerRecord<String, String>> it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord<String, String> next = it.next();
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("adding ({}) {} in {} ms", Integer.valueOf(i), next.key(), Long.valueOf(currentTimeMillis2));
                }
                tryToAdd(new FetchEmitTuple(next.key(), new FetchKey(fetcherName, next.key()), new EmitKey(emitterName, next.key()), new Metadata(), handlerConfig, getOnParseException()));
                i++;
            }
            if (this.emitMax <= 0 && i >= this.emitMax) {
                break;
            }
        } while (!poll.isEmpty());
        LOGGER.info("Finished enqueuing {} files in {} ms", Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }
}
