package org.apache.tika.pipes.emitter.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.pure.SnappyRawCompressor;

/* loaded from: input_file:org/apache/tika/pipes/emitter/kafka/KafkaEmitter.class */
public class KafkaEmitter extends AbstractEmitter implements Initializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEmitter.class);
    private static final ObjectMapper OM = new ObjectMapper();
    String topic;
    String bootstrapServers;
    String interceptorClasses;
    String transactionalId;
    String clientId;
    String keySerializer;
    String valueSerializer;
    private Producer<String, String> producer;
    String acks = "all";
    int lingerMs = AbstractCoordinator.JOIN_GROUP_TIMEOUT_LAPSE;
    int batchSize = SnappyRawCompressor.MAX_HASH_TABLE_SIZE;
    int bufferMemory = 33554432;
    String compressionType = "none";
    int connectionsMaxIdleMs = 540000;
    int deliveryTimeoutMs = 120000;
    boolean enableIdempotence = false;
    int maxBlockMs = 60000;
    int maxInFlightRequestsPerConnection = 5;
    int maxRequestSize = ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES;
    int metadataMaxAgeMs = 300000;
    int requestTimeoutMs = 30000;
    int retries = Integer.MAX_VALUE;
    int retryBackoffMs = 100;
    int transactionTimeoutMs = 60000;

    @Field
    public void setBootstrapServers(String str) {
        this.bootstrapServers = str;
    }

    @Field
    public void setAcks(String str) {
        this.acks = str;
    }

    @Field
    public void setLingerMs(int i) {
        this.lingerMs = i;
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    @Field
    public void setBufferMemory(int i) {
        this.bufferMemory = i;
    }

    @Field
    public void setClientId(String str) {
        this.clientId = str;
    }

    @Field
    public void setCompressionType(String str) {
        this.compressionType = str;
    }

    @Field
    public void setConnectionsMaxIdleMs(int i) {
        this.connectionsMaxIdleMs = i;
    }

    @Field
    public void setDeliveryTimeoutMs(int i) {
        this.deliveryTimeoutMs = i;
    }

    @Field
    public void setEnableIdempotence(boolean z) {
        this.enableIdempotence = z;
    }

    @Field
    public void setInterceptorClasses(String str) {
        this.interceptorClasses = str;
    }

    @Field
    public void setMaxBlockMs(int i) {
        this.maxBlockMs = i;
    }

    @Field
    public void setMaxInFlightRequestsPerConnection(int i) {
        this.maxInFlightRequestsPerConnection = i;
    }

    @Field
    public void setMaxRequestSize(int i) {
        this.maxRequestSize = i;
    }

    @Field
    public void setMetadataMaxAgeMs(int i) {
        this.metadataMaxAgeMs = i;
    }

    @Field
    public void setRequestTimeoutMs(int i) {
        this.requestTimeoutMs = i;
    }

    @Field
    public void setRetries(int i) {
        this.retries = i;
    }

    @Field
    public void setRetryBackoffMs(int i) {
        this.retryBackoffMs = i;
    }

    @Field
    public void setTransactionTimeoutMs(int i) {
        this.transactionTimeoutMs = i;
    }

    @Field
    public void setTransactionalId(String str) {
        this.transactionalId = str;
    }

    @Field
    public void setKeySerializer(String str) {
        this.keySerializer = str;
    }

    @Field
    public void setValueSerializer(String str) {
        this.valueSerializer = str;
    }

    @Field
    public void setTopic(String str) {
        this.topic = str;
    }

    public void emit(String str, List<Metadata> list) throws IOException, TikaEmitterException {
        if (list == null || list.isEmpty()) {
            throw new TikaEmitterException("metadata list must not be null or of size 0");
        }
        for (Metadata metadata : list) {
            LOGGER.debug("about to emit to target topic: ({}) path:({})", this.topic, str);
            HashMap hashMap = new HashMap();
            for (String str2 : metadata.names()) {
                String[] values = metadata.getValues(str2);
                if (values.length > 1) {
                    LOGGER.warn("Can only write the first value for key {}. I see {} values.", str2, Integer.valueOf(values.length));
                }
                hashMap.put(str2, values[0]);
            }
            this.producer.send(new ProducerRecord<>(this.topic, str, OM.writeValueAsString(hashMap)));
        }
    }

    private void safePut(Properties properties, String str, Object obj) {
        if (obj != null) {
            properties.put(str, obj);
        }
    }

    public void initialize(Map<String, Param> map) throws TikaConfigException {
        Properties properties = new Properties();
        safePut(properties, "bootstrap.servers", this.bootstrapServers);
        safePut(properties, ProducerConfig.ACKS_CONFIG, this.acks);
        safePut(properties, "retries", Integer.valueOf(this.retries));
        safePut(properties, ProducerConfig.BATCH_SIZE_CONFIG, Integer.valueOf(this.batchSize));
        safePut(properties, ProducerConfig.LINGER_MS_CONFIG, Integer.valueOf(this.lingerMs));
        safePut(properties, ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.valueOf(this.bufferMemory));
        safePut(properties, "client.id", this.clientId);
        safePut(properties, "compression.type", this.compressionType);
        safePut(properties, ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.valueOf(this.deliveryTimeoutMs));
        safePut(properties, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, Boolean.valueOf(this.enableIdempotence));
        safePut(properties, "interceptor.classes", this.interceptorClasses);
        safePut(properties, ProducerConfig.MAX_BLOCK_MS_CONFIG, Integer.valueOf(this.maxBlockMs));
        safePut(properties, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, Integer.valueOf(this.maxInFlightRequestsPerConnection));
        safePut(properties, ProducerConfig.MAX_REQUEST_SIZE_CONFIG, Integer.valueOf(this.maxRequestSize));
        safePut(properties, "metadata.max.age.ms", Integer.valueOf(this.metadataMaxAgeMs));
        safePut(properties, "request.timeout.ms", Integer.valueOf(this.requestTimeoutMs));
        safePut(properties, "retry.backoff.ms", Integer.valueOf(this.retryBackoffMs));
        safePut(properties, ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, Integer.valueOf(this.transactionTimeoutMs));
        safePut(properties, ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionalId);
        safePut(properties, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serializerClass(this.keySerializer, StringSerializer.class));
        safePut(properties, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializerClass(this.valueSerializer, StringSerializer.class));
        this.producer = new KafkaProducer(properties);
    }

    private Object serializerClass(String str, Class cls) {
        if (str == null) {
            return cls;
        }
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            LOGGER.error("Could not find key serializer class: {}", str);
            return null;
        }
    }

    public void checkInitialization(InitializableProblemHandler initializableProblemHandler) throws TikaConfigException {
        TikaConfig.mustNotBeEmpty("topic", this.topic);
        TikaConfig.mustNotBeEmpty("server", this.bootstrapServers);
    }
}
