package org.apache.seatunnel.connectors.seatunnel.starrocks.client.source;

import com.starrocks.shade.org.apache.thrift.TException;
import com.starrocks.shade.org.apache.thrift.protocol.TBinaryProtocol;
import com.starrocks.shade.org.apache.thrift.protocol.TMultiplexedProtocol;
import com.starrocks.shade.org.apache.thrift.transport.TSocket;
import com.starrocks.shade.org.apache.thrift.transport.TTransportException;
import com.starrocks.thrift.TScanBatchResult;
import com.starrocks.thrift.TScanCloseParams;
import com.starrocks.thrift.TScanNextBatchParams;
import com.starrocks.thrift.TScanOpenParams;
import com.starrocks.thrift.TScanOpenResult;
import com.starrocks.thrift.TStarrocksExternalService;
import com.starrocks.thrift.TStatusCode;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.class */
public class StarRocksBeReadClient implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(StarRocksBeReadClient.class);
    private static final String DEFAULT_CLUSTER_NAME = "default_cluster";
    private TStarrocksExternalService.Client client;
    private final String ip;
    private final int port;
    private String contextId;
    private final SourceConfig sourceConfig;
    private final SeaTunnelRowType seaTunnelRowType;
    private StarRocksRowBatchReader rowBatch;
    private final List<Long> tabletIds;
    private final String queryPlan;
    private int readerOffset = 0;
    protected AtomicBoolean eos = new AtomicBoolean(false);

    public StarRocksBeReadClient(QueryPartition queryPartition, SourceConfig sourceConfig, SeaTunnelRowType seaTunnelRowType) {
        this.sourceConfig = sourceConfig;
        this.seaTunnelRowType = seaTunnelRowType;
        String beAddress = queryPartition.getBeAddress();
        log.debug("Parse StarRocks BE address: '{}'.", beAddress);
        String[] split = beAddress.split(TMultiplexedProtocol.SEPARATOR);
        if (split.length != 2) {
            throw new StarRocksConnectorException(StarRocksConnectorErrorCode.CREATE_BE_READER_FAILED, String.format("Format of StarRocks BE address[%s] is illegal", beAddress));
        }
        this.ip = split[0].trim();
        this.port = Integer.parseInt(split[1].trim());
        this.queryPlan = queryPartition.getQueryPlan();
        this.tabletIds = new ArrayList(queryPartition.getTabletIds());
        TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
        TSocket tSocket = new TSocket(this.ip, this.port, sourceConfig.getConnectTimeoutMs(), sourceConfig.getConnectTimeoutMs());
        try {
            tSocket.open();
            this.client = new TStarrocksExternalService.Client(factory.getProtocol(tSocket));
        } catch (TTransportException e) {
            tSocket.close();
            throw new StarRocksConnectorException(StarRocksConnectorErrorCode.CREATE_BE_READER_FAILED, "Failed to open socket", e);
        }
    }

    public void openScanner() {
        TScanOpenParams tScanOpenParams = new TScanOpenParams();
        tScanOpenParams.setTablet_ids(this.tabletIds);
        tScanOpenParams.setOpaqued_query_plan(this.queryPlan);
        tScanOpenParams.setCluster(DEFAULT_CLUSTER_NAME);
        tScanOpenParams.setDatabase(this.sourceConfig.getDatabase());
        tScanOpenParams.setTable(this.sourceConfig.getTable());
        tScanOpenParams.setUser(this.sourceConfig.getUsername());
        tScanOpenParams.setPasswd(this.sourceConfig.getPassword());
        tScanOpenParams.setBatch_size(this.sourceConfig.getBatchRows());
        if (this.sourceConfig.getSourceOptionProps() != null) {
            tScanOpenParams.setProperties(this.sourceConfig.getSourceOptionProps());
        }
        tScanOpenParams.setKeep_alive_min((short) Math.min(32767, this.sourceConfig.getKeepAliveMin()));
        tScanOpenParams.setQuery_timeout(this.sourceConfig.getQueryTimeoutSec());
        tScanOpenParams.setMem_limit(this.sourceConfig.getMemLimit());
        log.info("open Scan params.mem_limit {} B", Long.valueOf(tScanOpenParams.getMem_limit()));
        log.info("open Scan params.keep-alive-min {} min", Short.valueOf(tScanOpenParams.getKeep_alive_min()));
        log.info("open Scan params.batch_size {}", Integer.valueOf(tScanOpenParams.getBatch_size()));
        try {
            TScanOpenResult open_scanner = this.client.open_scanner(tScanOpenParams);
            if (!TStatusCode.OK.equals(open_scanner.getStatus().getStatus_code())) {
                throw new StarRocksConnectorException(StarRocksConnectorErrorCode.SCAN_BE_DATA_FAILED, "Failed to open scanner." + open_scanner.getStatus().getStatus_code() + open_scanner.getStatus().getError_msgs());
            }
            this.contextId = open_scanner.getContext_id();
            log.info("Open scanner for {}:{} with context id {}, and there are {} tablets {}", new Object[]{this.ip, Integer.valueOf(this.port), this.contextId, Integer.valueOf(this.tabletIds.size()), this.tabletIds});
        } catch (TException e) {
            throw new StarRocksConnectorException(StarRocksConnectorErrorCode.SCAN_BE_DATA_FAILED, e.getMessage());
        }
    }

    public boolean hasNext() {
        if (!this.eos.get() && (this.rowBatch == null || !this.rowBatch.hasNext())) {
            if (this.rowBatch != null) {
                this.readerOffset += this.rowBatch.getReadRowCount();
                this.rowBatch.close();
            }
            TScanNextBatchParams tScanNextBatchParams = new TScanNextBatchParams();
            tScanNextBatchParams.setContext_id(this.contextId);
            tScanNextBatchParams.setOffset(this.readerOffset);
            try {
                TScanBatchResult tScanBatchResult = this.client.get_next(tScanNextBatchParams);
                if (!TStatusCode.OK.equals(tScanBatchResult.getStatus().getStatus_code())) {
                    throw new StarRocksConnectorException(StarRocksConnectorErrorCode.SCAN_BE_DATA_FAILED, "Failed to get next from be -> ip:[" + this.ip + "] " + tScanBatchResult.getStatus().getStatus_code() + " msg:" + tScanBatchResult.getStatus().getError_msgs());
                }
                this.eos.set(tScanBatchResult.isEos());
                if (!this.eos.get()) {
                    this.rowBatch = new StarRocksRowBatchReader(tScanBatchResult, this.seaTunnelRowType).readArrow();
                }
            } catch (TException e) {
                throw new StarRocksConnectorException(StarRocksConnectorErrorCode.SCAN_BE_DATA_FAILED, e.getMessage());
            }
        }
        return !this.eos.get();
    }

    public SeaTunnelRow getNext() {
        return this.rowBatch.next();
    }

    public void close() {
        log.info("Close reader for {}:{} with context id {}", new Object[]{this.ip, Integer.valueOf(this.port), this.contextId});
        TScanCloseParams tScanCloseParams = new TScanCloseParams();
        tScanCloseParams.setContext_id(this.contextId);
        try {
            this.client.close_scanner(tScanCloseParams);
        } catch (TException e) {
            log.error("Failed to close reader {}:{} with context id {}", new Object[]{this.ip, Integer.valueOf(this.port), this.contextId, e});
            throw new StarRocksConnectorException(StarRocksConnectorErrorCode.CLOSE_BE_READER_FAILED, e);
        }
    }
}
