package org.apache.iotdb.flink.sql.function;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.iotdb.flink.sql.client.IoTDBWebSocketClient;
import org.apache.iotdb.flink.sql.common.Options;
import org.apache.iotdb.flink.sql.common.Utils;
import org.apache.iotdb.flink.sql.exception.IllegalOptionException;
import org.apache.iotdb.flink.sql.exception.IllegalSchemaException;
import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
import org.apache.iotdb.flink.sql.wrapper.TabletWrapper;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.java_websocket.enums.ReadyState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.class */
public class IoTDBCDCSourceFunction extends RichSourceFunction<RowData> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) IoTDBCDCSourceFunction.class);
    private final List<IoTDBWebSocketClient> socketClients = new ArrayList();
    private final int cdcPort;
    private final List<String> nodeUrls;
    private final String taskName;
    private final String pattern;
    private final String user;
    private final String password;
    private final List<String> timeseriesList;
    private final BlockingQueue<TabletWrapper> tabletWrappers;
    private final List<Tuple2<String, DataType>> tableSchema;
    private transient ExecutorService consumeExecutor;

    /* loaded from: input_file:org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction$ConsumeRunnable.class */
    private class ConsumeRunnable implements Runnable {
        SourceFunction.SourceContext<RowData> context;

        public ConsumeRunnable(SourceFunction.SourceContext<RowData> sourceContext) {
            this.context = sourceContext;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    TabletWrapper tabletWrapper = (TabletWrapper) IoTDBCDCSourceFunction.this.tabletWrappers.take();
                    IoTDBCDCSourceFunction.this.collectTablet(tabletWrapper.getTablet(), this.context);
                    tabletWrapper.getWebSocketClient().send(String.format("ACK:%d", Long.valueOf(tabletWrapper.getCommitId())));
                } catch (InterruptedException e) {
                    IoTDBCDCSourceFunction.LOGGER.warn("The tablet can't be taken from queue!");
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public IoTDBCDCSourceFunction(ReadableConfig readableConfig, SchemaWrapper schemaWrapper) {
        this.tableSchema = schemaWrapper.getSchema();
        this.pattern = (String) readableConfig.get(Options.PATTERN);
        this.cdcPort = ((Integer) readableConfig.get(Options.CDC_PORT)).intValue();
        this.nodeUrls = Arrays.asList(((String) readableConfig.get(Options.NODE_URLS)).split(","));
        this.taskName = (String) readableConfig.get(Options.CDC_TASK_NAME);
        this.user = (String) readableConfig.get(Options.USER);
        this.password = (String) readableConfig.get(Options.PASSWORD);
        this.timeseriesList = (List) this.tableSchema.stream().map(tuple2 -> {
            return String.valueOf(tuple2.f0);
        }).collect(Collectors.toList());
        this.tabletWrappers = new ArrayBlockingQueue(this.nodeUrls.size());
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        Session build = new Session.Builder().username(this.user).password(this.password).nodeUrls(this.nodeUrls).build();
        build.open(false);
        String format = String.format("`flink_cdc_%s_%s_%d`", this.taskName, this.pattern.replace(TsFileConstant.BACK_QUOTE_STRING, TsFileConstant.DOUBLE_BACK_QUOTE_STRING), Integer.valueOf(this.cdcPort));
        boolean z = false;
        SessionDataSet executeQueryStatement = build.executeQueryStatement("show pipes");
        while (true) {
            try {
                if (!executeQueryStatement.hasNext()) {
                    break;
                } else if (format.equals(executeQueryStatement.next().getFields().get(0).getStringValue())) {
                    z = true;
                    break;
                }
            } finally {
            }
        }
        if (executeQueryStatement != null) {
            executeQueryStatement.close();
        }
        if (!z) {
            Iterator<String> it = this.nodeUrls.iterator();
            while (it.hasNext()) {
                if (Utils.isURIAvailable(new URI(String.format("ws://%s:%d", it.next().split(TMultiplexedProtocol.SEPARATOR)[0], Integer.valueOf(this.cdcPort))))) {
                    throw new IllegalOptionException(String.format("The port `%d` has been bound. Please use another one by option `cdc.port`.", Integer.valueOf(this.cdcPort)));
                }
            }
            build.executeNonQueryStatement(String.format("CREATE PIPE %s\nWITH EXTRACTOR (\n'extractor' = 'iotdb-extractor',\n'extractor.pattern' = '%s',\n) WITH CONNECTOR (\n'connector' = 'websocket-connector',\n'connector.websocket.port' = '%d')", format, this.pattern, Integer.valueOf(this.cdcPort)));
        }
        executeQueryStatement = build.executeQueryStatement("show pipes");
        String str = null;
        while (true) {
            try {
                if (!executeQueryStatement.hasNext()) {
                    break;
                }
                RowRecord next = executeQueryStatement.next();
                if (format.equals(next.getFields().get(0).getStringValue())) {
                    str = next.getFields().get(2).getStringValue();
                    break;
                }
            } finally {
            }
        }
        if ("STOPPED".equals(str)) {
            build.executeNonQueryStatement(String.format("start pipe %s", format));
        }
        if (executeQueryStatement != null) {
            executeQueryStatement.close();
        }
        build.close();
        this.consumeExecutor = Executors.newFixedThreadPool(1);
        Iterator<String> it2 = this.nodeUrls.iterator();
        while (it2.hasNext()) {
            this.socketClients.add(initAndGet(new URI(String.format("ws://%s:%s", it2.next().split(TMultiplexedProtocol.SEPARATOR)[0], Integer.valueOf(this.cdcPort)))));
        }
    }

    public void run(SourceFunction.SourceContext<RowData> sourceContext) throws InterruptedException {
        this.consumeExecutor.execute(new ConsumeRunnable(sourceContext));
        this.consumeExecutor.shutdown();
        while (true) {
            if (this.consumeExecutor.isTerminated()) {
                System.exit(1);
            }
            for (IoTDBWebSocketClient ioTDBWebSocketClient : this.socketClients) {
                if (ioTDBWebSocketClient.getReadyState().equals(ReadyState.CLOSED)) {
                    while (!Utils.isURIAvailable(ioTDBWebSocketClient.getURI())) {
                        LOGGER.warn(String.format("The URI %s:%d is not available now, sleep 5 seconds.", ioTDBWebSocketClient.getURI().getHost(), Integer.valueOf(ioTDBWebSocketClient.getURI().getPort())));
                        Thread.sleep(5000L);
                    }
                    ioTDBWebSocketClient.reconnect();
                    while (!ioTDBWebSocketClient.getReadyState().equals(ReadyState.OPEN)) {
                        Thread.sleep(1000L);
                    }
                    ioTDBWebSocketClient.send("START");
                } else {
                    Thread.sleep(1000L);
                }
            }
        }
    }

    public void cancel() {
        this.socketClients.forEach((v0) -> {
            v0.close();
        });
    }

    public void addTabletWrapper(TabletWrapper tabletWrapper) {
        try {
            this.tabletWrappers.put(tabletWrapper);
        } catch (InterruptedException e) {
            LOGGER.warn(String.format("The tablet from %s:%d can't be put into queue, because: %s", tabletWrapper.getWebSocketClient().getRemoteSocketAddress().getHostName(), Integer.valueOf(tabletWrapper.getWebSocketClient().getRemoteSocketAddress().getPort()), e.getMessage()));
            Thread.currentThread().interrupt();
        }
    }

    private IoTDBWebSocketClient initAndGet(URI uri) throws InterruptedException {
        while (!Utils.isURIAvailable(uri)) {
            LOGGER.warn(String.format("The URI %s:%d is not available now, sleep 5 seconds.", uri.getHost(), Integer.valueOf(uri.getPort())));
            Thread.sleep(5000L);
        }
        IoTDBWebSocketClient ioTDBWebSocketClient = new IoTDBWebSocketClient(uri, this);
        ioTDBWebSocketClient.connect();
        while (!ioTDBWebSocketClient.getReadyState().equals(ReadyState.OPEN)) {
            Thread.sleep(1000L);
        }
        ioTDBWebSocketClient.send("START");
        return ioTDBWebSocketClient;
    }

    public void collectTablet(Tablet tablet, SourceFunction.SourceContext<RowData> sourceContext) {
        List<MeasurementSchema> schemas = tablet.getSchemas();
        int i = tablet.rowSize;
        HashMap hashMap = new HashMap();
        for (MeasurementSchema measurementSchema : schemas) {
            String format = String.format("%s.%s", tablet.deviceId, measurementSchema.getMeasurementId());
            TSDataType type = measurementSchema.getType();
            int indexOf = this.timeseriesList.indexOf(format);
            if (indexOf == -1) {
                return;
            }
            if (!Utils.isTypeEqual(type, (DataType) this.tableSchema.get(indexOf).f1)) {
                throw new IllegalSchemaException(String.format("The data type of column `%s` is different in IoTDB and Flink", format));
            }
            hashMap.put(format, new Pair(tablet.bitMaps[schemas.indexOf(measurementSchema)], Utils.object2List(tablet.values[schemas.indexOf(measurementSchema)], type)));
        }
        for (int i2 = 0; i2 < i; i2++) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(Long.valueOf(tablet.timestamps[i2]));
            for (String str : this.timeseriesList) {
                if (!hashMap.containsKey(str) || (((Pair) hashMap.get(str)).getLeft() != null && ((BitMap) ((Pair) hashMap.get(str)).getLeft()).isMarked(i2))) {
                    arrayList.add(null);
                } else {
                    arrayList.add(((List) ((Pair) hashMap.get(str)).getRight()).get(i2));
                }
            }
            sourceContext.collect(GenericRowData.of(arrayList.toArray()));
        }
    }
}
