/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.iotdb.kafka.Constant;
import org.apache.iotdb.kafka.ConsumerThread;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer {
    private List<KafkaConsumer<String, String>> consumerList;
    private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
    private static SessionPool pool;

    private Consumer(List<KafkaConsumer<String, String>> consumerList) {
        this.consumerList = consumerList;
        pool = new SessionPool.Builder().host("localhost").port(6667).user("root").password("root").maxSize(3).build();
    }

    public static void main(String[] args) {
        ArrayList<KafkaConsumer<String, String>> consumerList = new ArrayList<KafkaConsumer<String, String>>();
        for (int i = 0; i < 5; ++i) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "127.0.0.1:9092");
            props.put("key.deserializer", StringDeserializer.class);
            props.put("value.deserializer", StringDeserializer.class);
            props.put("auto.offset.reset", "earliest");
            props.put("group.id", "Kafka-Test");
            KafkaConsumer consumer = new KafkaConsumer(props);
            consumerList.add((KafkaConsumer<String, String>)consumer);
            consumer.subscribe(Collections.singleton("Kafka-Test"));
        }
        Consumer consumer = new Consumer(consumerList);
        Consumer.initIoTDB();
        consumer.consumeInParallel();
    }

    private static void initIoTDB() {
        try {
            for (String string : Constant.STORAGE_GROUP) {
                Consumer.addStorageGroup(string);
            }
            for (String string : Constant.CREATE_TIMESERIES) {
                Consumer.createTimeseries((String[])string);
            }
        }
        catch (IoTDBConnectionException | StatementExecutionException e) {
            logger.error(e.getMessage());
        }
    }

    private static void addStorageGroup(String storageGroup) throws IoTDBConnectionException, StatementExecutionException {
        pool.setStorageGroup(storageGroup);
    }

    private static void createTimeseries(String[] sql) throws StatementExecutionException, IoTDBConnectionException {
        String timeseries = sql[0];
        TSDataType dataType = TSDataType.valueOf((String)sql[1]);
        TSEncoding encoding = TSEncoding.valueOf((String)sql[2]);
        CompressionType compressionType = CompressionType.valueOf((String)sql[3]);
        pool.createTimeseries(timeseries, dataType, encoding, compressionType);
    }

    private void consumeInParallel() {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < this.consumerList.size(); ++i) {
            ConsumerThread consumerThread = new ConsumerThread(this.consumerList.get(i), pool);
            executor.submit(consumerThread);
        }
    }
}

