package org.apache.seatunnel.connectors.doris.source.reader;

import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.apache.doris.sdk.thrift.TScanCloseParams;
import org.apache.doris.sdk.thrift.TScanNextBatchParams;
import org.apache.doris.sdk.thrift.TScanOpenParams;
import org.apache.doris.sdk.thrift.TScanOpenResult;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.doris.backend.BackendClient;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.config.DorisOptions;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition;
import org.apache.seatunnel.connectors.doris.rest.models.Schema;
import org.apache.seatunnel.connectors.doris.source.serialization.Routing;
import org.apache.seatunnel.connectors.doris.source.serialization.RowBatch;
import org.apache.seatunnel.connectors.doris.util.ErrorMessages;
import org.apache.seatunnel.connectors.doris.util.SchemaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.class */
public class DorisValueReader {
    private static final Logger log = LoggerFactory.getLogger(DorisValueReader.class);
    private PartitionDefinition partition;
    private DorisConfig config;
    protected RowBatch rowBatch;
    protected boolean deserializeArrowToRowBatchAsync;
    protected BlockingQueue<RowBatch> rowBatchBlockingQueue;
    private TScanOpenParams openParams;
    protected String contextId;
    protected Schema schema;
    protected SeaTunnelRowType seaTunnelRowType;
    protected boolean asyncThreadStarted;
    protected Lock clientLock = new ReentrantLock();
    protected int offset = 0;
    protected AtomicBoolean eos = new AtomicBoolean(false);
    protected Thread asyncThread = new Thread(new Runnable() { // from class: org.apache.seatunnel.connectors.doris.source.reader.DorisValueReader.1
        @Override // java.lang.Runnable
        public void run() {
            DorisValueReader.this.clientLock.lock();
            try {
                TScanNextBatchParams tScanNextBatchParams = new TScanNextBatchParams();
                tScanNextBatchParams.setContextId(DorisValueReader.this.contextId);
                while (!DorisValueReader.this.eos.get()) {
                    tScanNextBatchParams.setOffset(DorisValueReader.this.offset);
                    TScanBatchResult next = DorisValueReader.this.client.getNext(tScanNextBatchParams);
                    DorisValueReader.this.eos.set(next.isEos());
                    if (!DorisValueReader.this.eos.get()) {
                        RowBatch readArrow = new RowBatch(next, DorisValueReader.this.seaTunnelRowType).readArrow();
                        DorisValueReader.this.offset += readArrow.getReadRowCount();
                        readArrow.close();
                        try {
                            DorisValueReader.this.rowBatchBlockingQueue.put(readArrow);
                        } catch (InterruptedException e) {
                            throw new DorisConnectorException(DorisConnectorErrorCode.ROW_BATCH_GET_FAILED, e);
                        }
                    }
                }
            } finally {
                DorisValueReader.this.clientLock.unlock();
            }
        }
    });
    protected BackendClient client = backendClient();

    public DorisValueReader(PartitionDefinition partitionDefinition, DorisConfig dorisConfig, SeaTunnelRowType seaTunnelRowType) {
        this.partition = partitionDefinition;
        this.config = dorisConfig;
        this.deserializeArrowToRowBatchAsync = dorisConfig.getDeserializeArrowAsync().booleanValue();
        this.seaTunnelRowType = seaTunnelRowType;
        int deserializeQueueSize = dorisConfig.getDeserializeQueueSize();
        if (this.deserializeArrowToRowBatchAsync) {
            this.rowBatchBlockingQueue = new ArrayBlockingQueue(deserializeQueueSize);
        }
        init();
    }

    private void init() {
        this.clientLock.lock();
        try {
            this.openParams = openParams();
            TScanOpenResult openScanner = this.client.openScanner(this.openParams);
            this.contextId = openScanner.getContextId();
            this.schema = SchemaUtils.convertToSchema(openScanner.getSelectedColumns());
            this.asyncThreadStarted = asyncThreadStarted();
            log.debug("Open scan result is, contextId: {}, schema: {}.", this.contextId, this.schema);
        } finally {
            this.clientLock.unlock();
        }
    }

    private BackendClient backendClient() {
        try {
            return new BackendClient(new Routing(this.partition.getBeAddress()), this.config);
        } catch (IllegalArgumentException e) {
            log.error("init backend:{} client failed,", this.partition.getBeAddress(), e);
            throw new DorisConnectorException(DorisConnectorErrorCode.BACKEND_CLIENT_FAILED, e);
        }
    }

    private TScanOpenParams openParams() {
        TScanOpenParams tScanOpenParams = new TScanOpenParams();
        tScanOpenParams.cluster = DorisOptions.DORIS_DEFAULT_CLUSTER;
        tScanOpenParams.database = this.partition.getDatabase();
        tScanOpenParams.table = this.partition.getTable();
        tScanOpenParams.tablet_ids = Arrays.asList((Long[]) this.partition.getTabletIds().toArray(new Long[0]));
        tScanOpenParams.opaqued_query_plan = this.partition.getQueryPlan();
        Integer valueOf = Integer.valueOf(this.config.getBatchSize());
        Integer requestQueryTimeoutS = this.config.getRequestQueryTimeoutS();
        Long execMemLimit = this.config.getExecMemLimit();
        tScanOpenParams.setBatchSize(valueOf.intValue());
        tScanOpenParams.setQueryTimeout(requestQueryTimeoutS.intValue());
        tScanOpenParams.setMemLimit(execMemLimit.longValue());
        tScanOpenParams.setUser(this.config.getUsername());
        tScanOpenParams.setPasswd(this.config.getPassword());
        log.debug("Open scan params is,cluster:{},database:{},table:{},tabletId:{},batch size:{},query timeout:{},execution memory limit:{},user:{},query plan: {}", new Object[]{tScanOpenParams.getCluster(), tScanOpenParams.getDatabase(), tScanOpenParams.getTable(), tScanOpenParams.getTabletIds(), Integer.valueOf(tScanOpenParams.getBatchSize()), Integer.valueOf(tScanOpenParams.getQueryTimeout()), Long.valueOf(tScanOpenParams.getMemLimit()), tScanOpenParams.getUser(), tScanOpenParams.getOpaquedQueryPlan()});
        return tScanOpenParams;
    }

    protected boolean asyncThreadStarted() {
        boolean z = false;
        if (this.deserializeArrowToRowBatchAsync) {
            this.asyncThread.start();
            z = true;
        }
        return z;
    }

    public boolean hasNext() {
        boolean z = false;
        if (!this.deserializeArrowToRowBatchAsync || !this.asyncThreadStarted) {
            this.clientLock.lock();
            try {
                if (!this.eos.get() && (this.rowBatch == null || !this.rowBatch.hasNext())) {
                    if (this.rowBatch != null) {
                        this.offset += this.rowBatch.getReadRowCount();
                        this.rowBatch.close();
                    }
                    TScanNextBatchParams tScanNextBatchParams = new TScanNextBatchParams();
                    tScanNextBatchParams.setContextId(this.contextId);
                    tScanNextBatchParams.setOffset(this.offset);
                    TScanBatchResult next = this.client.getNext(tScanNextBatchParams);
                    this.eos.set(next.isEos());
                    if (!this.eos.get()) {
                        this.rowBatch = new RowBatch(next, this.seaTunnelRowType).readArrow();
                    }
                }
                z = !this.eos.get();
            } finally {
                this.clientLock.unlock();
            }
        } else if (this.rowBatch == null || !this.rowBatch.hasNext()) {
            while (true) {
                if (this.eos.get() && this.rowBatchBlockingQueue.isEmpty()) {
                    break;
                }
                if (!this.rowBatchBlockingQueue.isEmpty()) {
                    try {
                        this.rowBatch = this.rowBatchBlockingQueue.take();
                        z = true;
                        break;
                    } catch (InterruptedException e) {
                        throw new DorisConnectorException(DorisConnectorErrorCode.ROW_BATCH_GET_FAILED, e);
                    }
                }
                try {
                    Thread.sleep(5L);
                } catch (InterruptedException e2) {
                }
            }
        } else {
            z = true;
        }
        return z;
    }

    public SeaTunnelRow next() {
        if (hasNext()) {
            return this.rowBatch.next();
        }
        log.error(ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
        throw new DorisConnectorException(DorisConnectorErrorCode.SHOULD_NEVER_HAPPEN, "never happen error.");
    }

    public void close() {
        this.clientLock.lock();
        try {
            try {
                TScanCloseParams tScanCloseParams = new TScanCloseParams();
                tScanCloseParams.setContextId(this.contextId);
                this.client.closeScanner(tScanCloseParams);
                this.clientLock.unlock();
            } catch (Exception e) {
                log.error("Failed to close reader with context id {}", this.contextId, e);
                throw new DorisConnectorException(DorisConnectorErrorCode.RESOURCE_CLOSE_FAILED, e);
            }
        } catch (Throwable th) {
            this.clientLock.unlock();
            throw th;
        }
    }
}
