/*
 * Decompiled with CFR 0.152.
 */
package org.apache.james.mailbox.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.james.mailbox.store.publisher.MessageConsumer;
import org.apache.james.mailbox.store.publisher.MessageReceiver;
import org.apache.james.mailbox.store.publisher.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMessageConsumer
implements MessageConsumer {
    private static final String ZK_SESSION_TIMEOUT = "400";
    private static final String ZK_SYNC_TIME = "200";
    private static final String AUTO_COMMIT8INTERVAL_MS = "1000";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageConsumer.class);
    private final ConsumerConnector consumer;
    private final int numberOfTread;
    private MessageReceiver messageReceiver;
    private ExecutorService executor;
    private boolean isInitialized;

    public KafkaMessageConsumer(String zookeeperConnectionString, String groupId, int numberOfThread) {
        this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)this.createConsumerConfig(zookeeperConnectionString, groupId));
        this.numberOfTread = numberOfThread;
        this.isInitialized = false;
    }

    public void setMessageReceiver(MessageReceiver messageReceiver) {
        if (this.isInitialized) {
            throw new RuntimeException("Can not change the MessageReceiver of a running KafkaMessageConsumer");
        }
        this.messageReceiver = messageReceiver;
    }

    @PreDestroy
    public void destroy() {
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        if (this.executor != null) {
            this.executor.shutdown();
        }
        this.isInitialized = false;
    }

    @PostConstruct
    public void init(Topic topic) {
        if (!this.isInitialized) {
            this.isInitialized = true;
            List<KafkaStream<byte[], byte[]>> streams = this.getKafkaStreams(topic.getValue());
            this.executor = Executors.newFixedThreadPool(this.numberOfTread);
            this.startConsuming(streams);
        } else {
            LOG.warn("This Kafka MailboxMessage Receiver was already launched.");
        }
    }

    private List<KafkaStream<byte[], byte[]>> getKafkaStreams(String topic) {
        HashMap<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, this.numberOfTread);
        Map consumerMap = this.consumer.createMessageStreams(topicCountMap);
        return (List)consumerMap.get(topic);
    }

    private void startConsuming(List<KafkaStream<byte[], byte[]>> streams) {
        streams.forEach(stream -> this.executor.submit(new Consumer((KafkaStream<byte[], byte[]>)stream)));
    }

    private ConsumerConfig createConsumerConfig(String zookeeperConnectionString, String groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeperConnectionString);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", ZK_SESSION_TIMEOUT);
        props.put("zookeeper.sync.time.ms", ZK_SYNC_TIME);
        props.put("auto.commit.interval.ms", AUTO_COMMIT8INTERVAL_MS);
        return new ConsumerConfig(props);
    }

    private class Consumer
    implements Runnable {
        private final KafkaStream<byte[], byte[]> m_stream;

        public Consumer(KafkaStream<byte[], byte[]> a_stream) {
            this.m_stream = a_stream;
        }

        @Override
        public void run() {
            for (MessageAndMetadata aM_stream : this.m_stream) {
                KafkaMessageConsumer.this.messageReceiver.receiveSerializedEvent((byte[])aM_stream.message());
            }
        }
    }
}

