/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.kafka;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.apache.iotdb.kafka.Constant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerThread
implements Runnable {
    private Connection connection = null;
    private Statement statement = null;
    private KafkaStream<String, String> stream;
    private static boolean setStorageGroup = true;
    private static boolean createTimeSeries = true;
    private String createStorageGroupSqlTemplate = "SET STORAGE GROUP TO %s";
    private String createTimeseriesSqlTemplate = "CREATE TIMESERIES %s WITH DATATYPE=TEXT, ENCODING=PLAIN";
    private String insertDataSqlTemplate = "INSERT INTO root.vehicle.device(timestamp,%s) VALUES (%s,'%s')";
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerThread.class);

    public KafkaConsumerThread(KafkaStream<String, String> stream) {
        this.stream = stream;
        this.initIoTDB();
    }

    private void initIoTDB() {
        try {
            Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
            this.connection = DriverManager.getConnection("jdbc:iotdb://localhost:6667/", "root", "root");
            this.statement = this.connection.createStatement();
            if (setStorageGroup) {
                try {
                    this.statement.execute(String.format(this.createStorageGroupSqlTemplate, "root.vehicle"));
                }
                catch (SQLException sQLException) {
                    // empty catch block
                }
                setStorageGroup = false;
            }
            if (createTimeSeries) {
                for (String timeseries : Constant.ALL_TIMESERIES) {
                    this.statement.addBatch(String.format(this.createTimeseriesSqlTemplate, timeseries));
                }
                this.statement.executeBatch();
                this.statement.clearBatch();
                createTimeSeries = false;
            }
        }
        catch (ClassNotFoundException | SQLException e) {
            logger.error(e.getMessage());
        }
    }

    private void writeData(String message) {
        String[] items = message.split(",");
        try {
            String sql = String.format(this.insertDataSqlTemplate, items[0], items[1], items[2]);
            this.statement.execute(sql);
        }
        catch (SQLException e) {
            logger.error(e.getMessage());
        }
    }

    @Override
    public void run() {
        for (MessageAndMetadata consumerIterator : this.stream) {
            String uploadMessage = (String)consumerIterator.message();
            logger.info(String.format("%s from partiton[%d]: %s", Thread.currentThread().getName(), consumerIterator.partition(), uploadMessage));
            this.writeData(uploadMessage);
        }
    }
}

