/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerThread
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerThread.class);
    private KafkaConsumer<String, String> consumer;
    private SessionPool pool;

    public ConsumerThread(KafkaConsumer<String, String> consumer, SessionPool pool) {
        this.consumer = consumer;
        this.pool = pool;
    }

    private void insert(String data) throws IoTDBConnectionException, StatementExecutionException {
        String[] dataArray = data.split(",");
        String device = dataArray[0];
        long time = Long.parseLong(dataArray[1]);
        List<String> measurements = Arrays.asList(dataArray[2].split(":"));
        ArrayList<TSDataType> types = new ArrayList<TSDataType>();
        for (String type : dataArray[3].split(":")) {
            types.add(TSDataType.valueOf((String)type));
        }
        ArrayList<Object> values = new ArrayList<Object>();
        String[] valuesStr = dataArray[4].split(":");
        block9: for (int i = 0; i < valuesStr.length; ++i) {
            switch ((TSDataType)types.get(i)) {
                case INT64: {
                    values.add(Long.parseLong(valuesStr[i]));
                    continue block9;
                }
                case DOUBLE: {
                    values.add(Double.parseDouble(valuesStr[i]));
                    continue block9;
                }
                case INT32: {
                    values.add(Integer.parseInt(valuesStr[i]));
                    continue block9;
                }
                case TEXT: {
                    values.add(valuesStr[i]);
                    continue block9;
                }
                case FLOAT: {
                    values.add(Float.valueOf(Float.parseFloat(valuesStr[i])));
                    continue block9;
                }
                case BOOLEAN: {
                    values.add(Boolean.parseBoolean(valuesStr[i]));
                }
            }
        }
        this.pool.insertRecord(device, time, measurements, types, values);
    }

    private void insertDatas(List<String> datas) throws IoTDBConnectionException, StatementExecutionException {
        int size = datas.size();
        ArrayList<String> deviceIds = new ArrayList<String>(size);
        ArrayList<Long> times = new ArrayList<Long>(size);
        ArrayList<List<String>> measurementsList = new ArrayList<List<String>>(size);
        ArrayList typesList = new ArrayList(size);
        ArrayList valuesList = new ArrayList(size);
        for (String data : datas) {
            String[] dataArray = data.split(",");
            String device = dataArray[0];
            long time = Long.parseLong(dataArray[1]);
            List<String> measurements = Arrays.asList(dataArray[2].split(":"));
            ArrayList<TSDataType> types = new ArrayList<TSDataType>();
            for (String type : dataArray[3].split(":")) {
                types.add(TSDataType.valueOf((String)type));
            }
            ArrayList<Object> values = new ArrayList<Object>();
            String[] valuesStr = dataArray[4].split(":");
            block10: for (int i = 0; i < valuesStr.length; ++i) {
                switch ((TSDataType)types.get(i)) {
                    case INT64: {
                        values.add(Long.parseLong(valuesStr[i]));
                        continue block10;
                    }
                    case DOUBLE: {
                        values.add(Double.parseDouble(valuesStr[i]));
                        continue block10;
                    }
                    case INT32: {
                        values.add(Integer.parseInt(valuesStr[i]));
                        continue block10;
                    }
                    case TEXT: {
                        values.add(valuesStr[i]);
                        continue block10;
                    }
                    case FLOAT: {
                        values.add(Float.valueOf(Float.parseFloat(valuesStr[i])));
                        continue block10;
                    }
                    case BOOLEAN: {
                        values.add(Boolean.parseBoolean(valuesStr[i]));
                    }
                }
            }
            deviceIds.add(device);
            times.add(time);
            measurementsList.add(measurements);
            typesList.add(types);
            valuesList.add(values);
        }
        this.pool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList);
    }

    @Override
    public void run() {
        try {
            while (true) {
                ConsumerRecords records = this.consumer.poll(Duration.ofSeconds(1L));
                ArrayList<String> datas = new ArrayList<String>(records.count());
                for (ConsumerRecord record : records) {
                    datas.add((String)record.value());
                }
                this.insertDatas(datas);
            }
        }
        catch (Exception e) {
            logger.error(e.getMessage());
            return;
        }
    }
}

