package org.apache.seatunnel.connectors.seatunnel.iotdb.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.session.util.Version;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.class */
public class IoTDBSourceReader implements SourceReader<SeaTunnelRow, IoTDBSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(IoTDBSourceReader.class);
    private static final long THREAD_WAIT_TIME = 500;
    private Map<String, Object> conf;
    private Set<IoTDBSourceSplit> sourceSplits = new HashSet();
    private final SourceReader.Context context;
    private SeaTunnelRowType seaTunnelRowType;
    private Session session;

    public IoTDBSourceReader(Map<String, Object> map, SourceReader.Context context, SeaTunnelRowType seaTunnelRowType) {
        this.conf = map;
        this.context = context;
        this.seaTunnelRowType = seaTunnelRowType;
    }

    public void open() throws IoTDBConnectionException {
        this.session = buildSession(this.conf);
        this.session.open();
    }

    public void close() throws IOException {
        try {
            this.session.close();
        } catch (IoTDBConnectionException e) {
            throw new IOException("close IoTDB session failed", e);
        }
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        if (this.sourceSplits.isEmpty()) {
            Thread.sleep(THREAD_WAIT_TIME);
            return;
        }
        this.sourceSplits.forEach(ioTDBSourceSplit -> {
            try {
                read(ioTDBSourceSplit, collector);
            } catch (Exception e) {
                throw new RuntimeException("IotDB source read error", e);
            }
        });
        if (Boundedness.BOUNDED.equals(this.context.getBoundedness())) {
            log.info("Closed the bounded fake source");
            this.context.signalNoMoreElement();
        }
    }

    private void read(IoTDBSourceSplit ioTDBSourceSplit, Collector<SeaTunnelRow> collector) throws Exception {
        SessionDataSet executeQueryStatement = this.session.executeQueryStatement(ioTDBSourceSplit.getQuery());
        while (executeQueryStatement.hasNext()) {
            try {
                RowRecord next = executeQueryStatement.next();
                Object[] objArr = new Object[next.getFields().size()];
                for (int i = 0; i < next.getFields().size(); i++) {
                    next.getFields().get(i).getDataType();
                    objArr[i] = convertToDataType(next.getFields().get(i));
                }
                collector.collect(new SeaTunnelRow(objArr));
            } catch (Throwable th) {
                if (executeQueryStatement != null) {
                    try {
                        executeQueryStatement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (executeQueryStatement != null) {
            executeQueryStatement.close();
        }
    }

    private Object convertToDataType(Field field) {
        switch (field.getDataType()) {
            case INT32:
                return Integer.valueOf(field.getIntV());
            case INT64:
                return Long.valueOf(field.getLongV());
            case FLOAT:
                return Float.valueOf(field.getFloatV());
            case DOUBLE:
                return Double.valueOf(field.getDoubleV());
            case TEXT:
                return field.getStringValue();
            case BOOLEAN:
                return Boolean.valueOf(field.getBoolV());
            default:
                throw new IllegalArgumentException("unknown TSData type: " + field.getDataType());
        }
    }

    private Session buildSession(Map<String, Object> map) {
        Session.Builder builder = new Session.Builder();
        if (map.containsKey(SourceConfig.HOST)) {
            builder.host((String) map.get(SourceConfig.HOST)).port(Integer.parseInt(map.get(SourceConfig.PORT).toString())).build();
        } else {
            builder.nodeUrls((List) Stream.of((Object[]) ((String) map.get("node_urls")).split(",")).collect(Collectors.toList()));
        }
        if (null != map.get(SourceConfig.FETCH_SIZE)) {
            builder.fetchSize(Integer.parseInt(map.get(SourceConfig.FETCH_SIZE).toString()));
        }
        if (null != map.get("username")) {
            builder.username((String) map.get("username"));
        }
        if (null != map.get("password")) {
            builder.password((String) map.get("password"));
        }
        if (null != map.get(SourceConfig.THRIFT_DEFAULT_BUFFER_SIZE)) {
            builder.thriftDefaultBufferSize(Integer.parseInt(map.get(SourceConfig.THRIFT_DEFAULT_BUFFER_SIZE).toString()));
        }
        if (null != map.get(SourceConfig.THRIFT_MAX_FRAME_SIZE)) {
            builder.thriftMaxFrameSize(Integer.parseInt(map.get(SourceConfig.THRIFT_MAX_FRAME_SIZE).toString()));
        }
        if (null != map.get(SourceConfig.ENABLE_CACHE_LEADER)) {
            builder.enableCacheLeader(Boolean.parseBoolean(map.get(SourceConfig.ENABLE_CACHE_LEADER).toString()));
        }
        if (null != map.get(SourceConfig.VERSION)) {
            builder.version(Version.valueOf(map.get(SourceConfig.VERSION).toString()));
        }
        return builder.build();
    }

    public List<IoTDBSourceSplit> snapshotState(long j) {
        return new ArrayList(this.sourceSplits);
    }

    public void addSplits(List<IoTDBSourceSplit> list) {
        this.sourceSplits.addAll(list);
    }

    public void handleNoMoreSplits() {
    }

    public void notifyCheckpointComplete(long j) throws Exception {
    }
}
