package org.apache.streams.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
import org.apache.streams.core.StreamsResultSet;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/kafka/KafkaPersistReader.class */
public class KafkaPersistReader implements StreamsPersistReader, Serializable {
    public static final String STREAMS_ID = "KafkaPersistReader";
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReader.class);
    protected volatile Queue<StreamsDatum> persistQueue;
    private ObjectMapper mapper;
    private KafkaConfiguration config;
    private ConsumerConfig consumerConfig;
    private ConsumerConnector consumerConnector;
    public List<KafkaStream<String, String>> inStreams;
    private ExecutorService executor;

    public KafkaPersistReader() {
        this.mapper = new ObjectMapper();
        this.executor = Executors.newSingleThreadExecutor();
        this.config = KafkaConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("kafka"));
        this.persistQueue = new ConcurrentLinkedQueue();
    }

    public KafkaPersistReader(Queue<StreamsDatum> queue) {
        this.mapper = new ObjectMapper();
        this.executor = Executors.newSingleThreadExecutor();
        this.config = KafkaConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("kafka"));
        this.persistQueue = queue;
    }

    public void setConfig(KafkaConfiguration kafkaConfiguration) {
        this.config = kafkaConfiguration;
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void startStream() {
        Properties properties = new Properties();
        properties.setProperty("serializer.encoding", "UTF8");
        this.consumerConfig = new ConsumerConfig(properties);
        this.consumerConnector = Consumer.createJavaConsumerConnector(this.consumerConfig);
        Whitelist whitelist = new Whitelist(this.config.getTopic());
        VerifiableProperties verifiableProperties = new VerifiableProperties(properties);
        this.inStreams = this.consumerConnector.createMessageStreamsByFilter(whitelist, 1, new StringDecoder(verifiableProperties), new StringDecoder(verifiableProperties));
        Iterator<KafkaStream<String, String>> it = this.inStreams.iterator();
        while (it.hasNext()) {
            this.executor.submit(new KafkaPersistReaderTask(this, it.next()));
        }
    }

    public StreamsResultSet readAll() {
        return readCurrent();
    }

    public StreamsResultSet readCurrent() {
        return null;
    }

    public StreamsResultSet readNew(BigInteger bigInteger) {
        return null;
    }

    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
        return null;
    }

    public boolean isRunning() {
        return (this.executor.isShutdown() || this.executor.isTerminated()) ? false : true;
    }

    private static ConsumerConfig createConsumerConfig(String str, String str2) {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", str);
        properties.put("group.id", str2);
        properties.put("zookeeper.session.timeout.ms", "400");
        properties.put("zookeeper.sync.time.ms", "200");
        properties.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(properties);
    }

    public void prepare(Object obj) {
    }

    public void cleanUp() {
        this.consumerConnector.shutdown();
        while (!this.executor.isTerminated()) {
            try {
                this.executor.awaitTermination(5L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            }
        }
    }
}
