/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tika.pipes.emitter.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaEmitter
extends AbstractEmitter
implements Initializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEmitter.class);
    private static final ObjectMapper OM = new ObjectMapper();
    String topic;
    String bootstrapServers;
    String acks = "all";
    int lingerMs = 5000;
    int batchSize = 16384;
    int bufferMemory = 0x2000000;
    String compressionType = "none";
    int connectionsMaxIdleMs = 540000;
    int deliveryTimeoutMs = 120000;
    boolean enableIdempotence = false;
    String interceptorClasses;
    int maxBlockMs = 60000;
    int maxInFlightRequestsPerConnection = 5;
    int maxRequestSize = 0x100000;
    int metadataMaxAgeMs = 300000;
    int requestTimeoutMs = 30000;
    int retries = Integer.MAX_VALUE;
    int retryBackoffMs = 100;
    int transactionTimeoutMs = 60000;
    String transactionalId;
    String clientId;
    String keySerializer;
    String valueSerializer;
    private Producer<String, String> producer;

    @Field
    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }

    @Field
    public void setAcks(String acks) {
        this.acks = acks;
    }

    @Field
    public void setLingerMs(int lingerMs) {
        this.lingerMs = lingerMs;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    @Field
    public void setBufferMemory(int bufferMemory) {
        this.bufferMemory = bufferMemory;
    }

    @Field
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    @Field
    public void setCompressionType(String compressionType) {
        this.compressionType = compressionType;
    }

    @Field
    public void setConnectionsMaxIdleMs(int connectionsMaxIdleMs) {
        this.connectionsMaxIdleMs = connectionsMaxIdleMs;
    }

    @Field
    public void setDeliveryTimeoutMs(int deliveryTimeoutMs) {
        this.deliveryTimeoutMs = deliveryTimeoutMs;
    }

    @Field
    public void setEnableIdempotence(boolean enableIdempotence) {
        this.enableIdempotence = enableIdempotence;
    }

    @Field
    public void setInterceptorClasses(String interceptorClasses) {
        this.interceptorClasses = interceptorClasses;
    }

    @Field
    public void setMaxBlockMs(int maxBlockMs) {
        this.maxBlockMs = maxBlockMs;
    }

    @Field
    public void setMaxInFlightRequestsPerConnection(int maxInFlightRequestsPerConnection) {
        this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
    }

    @Field
    public void setMaxRequestSize(int maxRequestSize) {
        this.maxRequestSize = maxRequestSize;
    }

    @Field
    public void setMetadataMaxAgeMs(int metadataMaxAgeMs) {
        this.metadataMaxAgeMs = metadataMaxAgeMs;
    }

    @Field
    public void setRequestTimeoutMs(int requestTimeoutMs) {
        this.requestTimeoutMs = requestTimeoutMs;
    }

    @Field
    public void setRetries(int retries) {
        this.retries = retries;
    }

    @Field
    public void setRetryBackoffMs(int retryBackoffMs) {
        this.retryBackoffMs = retryBackoffMs;
    }

    @Field
    public void setTransactionTimeoutMs(int transactionTimeoutMs) {
        this.transactionTimeoutMs = transactionTimeoutMs;
    }

    @Field
    public void setTransactionalId(String transactionalId) {
        this.transactionalId = transactionalId;
    }

    @Field
    public void setKeySerializer(String keySerializer) {
        this.keySerializer = keySerializer;
    }

    @Field
    public void setValueSerializer(String valueSerializer) {
        this.valueSerializer = valueSerializer;
    }

    @Field
    public void setTopic(String topic) {
        this.topic = topic;
    }

    public void emit(String emitKey, List<Metadata> metadataList, ParseContext parseContext) throws IOException, TikaEmitterException {
        if (metadataList == null || metadataList.isEmpty()) {
            throw new TikaEmitterException("metadata list must not be null or of size 0");
        }
        for (Metadata metadata : metadataList) {
            LOGGER.debug("about to emit to target topic: ({}) path:({})", (Object)this.topic, (Object)emitKey);
            HashMap<String, String> fields = new HashMap<String, String>();
            for (String n : metadata.names()) {
                String[] vals = metadata.getValues(n);
                if (vals.length > 1) {
                    LOGGER.warn("Can only write the first value for key {}. I see {} values.", (Object)n, (Object)vals.length);
                }
                fields.put(n, vals[0]);
            }
            this.producer.send(new ProducerRecord<String, String>(this.topic, emitKey, OM.writeValueAsString(fields)));
        }
    }

    private void safePut(Properties props, String key, Object val) {
        if (val != null) {
            props.put(key, val);
        }
    }

    public void initialize(Map<String, Param> params) throws TikaConfigException {
        Properties props = new Properties();
        this.safePut(props, "bootstrap.servers", this.bootstrapServers);
        this.safePut(props, "acks", this.acks);
        this.safePut(props, "retries", this.retries);
        this.safePut(props, "batch.size", this.batchSize);
        this.safePut(props, "linger.ms", this.lingerMs);
        this.safePut(props, "buffer.memory", this.bufferMemory);
        this.safePut(props, "client.id", this.clientId);
        this.safePut(props, "compression.type", this.compressionType);
        this.safePut(props, "delivery.timeout.ms", this.deliveryTimeoutMs);
        this.safePut(props, "enable.idempotence", this.enableIdempotence);
        this.safePut(props, "interceptor.classes", this.interceptorClasses);
        this.safePut(props, "max.block.ms", this.maxBlockMs);
        this.safePut(props, "max.in.flight.requests.per.connection", this.maxInFlightRequestsPerConnection);
        this.safePut(props, "max.request.size", this.maxRequestSize);
        this.safePut(props, "metadata.max.age.ms", this.metadataMaxAgeMs);
        this.safePut(props, "request.timeout.ms", this.requestTimeoutMs);
        this.safePut(props, "retry.backoff.ms", this.retryBackoffMs);
        this.safePut(props, "transaction.timeout.ms", this.transactionTimeoutMs);
        this.safePut(props, "transactional.id", this.transactionalId);
        this.safePut(props, "key.serializer", this.serializerClass(this.keySerializer, StringSerializer.class));
        this.safePut(props, "value.serializer", this.serializerClass(this.valueSerializer, StringSerializer.class));
        this.producer = new KafkaProducer<String, String>(props);
    }

    private Object serializerClass(String className, Class defaultClass) {
        try {
            return className == null ? defaultClass : Class.forName(className);
        }
        catch (ClassNotFoundException e) {
            LOGGER.error("Could not find key serializer class: {}", (Object)className);
            return null;
        }
    }

    public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
        TikaConfig.mustNotBeEmpty((String)"topic", (String)this.topic);
        TikaConfig.mustNotBeEmpty((String)"server", (String)this.bootstrapServers);
    }
}

