package org.apache.seatunnel.connectors.seatunnel.tablestore.source;

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.DeleteTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.DescribeTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.class */
public class TableStoreDBSourceReader implements SourceReader<SeaTunnelRow, TableStoreDBSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(TableStoreDBSourceReader.class);
    protected SourceReader.Context context;
    protected TablestoreOptions tablestoreOptions;
    protected SeaTunnelRowType seaTunnelRowType;
    Queue<TableStoreDBSourceSplit> pendingSplits = new ConcurrentLinkedDeque();
    private SyncClient client;
    private volatile boolean noMoreSplit;
    private TunnelClient tunnelClient;

    public TableStoreDBSourceReader(SourceReader.Context context, TablestoreOptions tablestoreOptions, SeaTunnelRowType seaTunnelRowType) {
        this.context = context;
        this.tablestoreOptions = tablestoreOptions;
        this.seaTunnelRowType = seaTunnelRowType;
    }

    public void open() throws Exception {
        this.client = new SyncClient(this.tablestoreOptions.getEndpoint(), this.tablestoreOptions.getAccessKeyId(), this.tablestoreOptions.getAccessKeySecret(), this.tablestoreOptions.getInstanceName());
        this.tunnelClient = new TunnelClient(this.tablestoreOptions.getEndpoint(), this.tablestoreOptions.getAccessKeyId(), this.tablestoreOptions.getAccessKeySecret(), this.tablestoreOptions.getInstanceName());
    }

    public void close() throws IOException {
        this.tunnelClient.shutdown();
        this.client.shutdown();
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        synchronized (collector.getCheckpointLock()) {
            TableStoreDBSourceSplit poll = this.pendingSplits.poll();
            if (Objects.nonNull(poll)) {
                read(poll, collector);
            }
            if (this.noMoreSplit) {
                log.info("Closed the bounded tablestore source");
                this.context.signalNoMoreElement();
                Thread.sleep(2000L);
            } else {
                Thread.sleep(1000L);
            }
        }
    }

    private void read(TableStoreDBSourceSplit tableStoreDBSourceSplit, Collector<SeaTunnelRow> collector) {
        TunnelWorker tunnelWorker = new TunnelWorker(getTunel(tableStoreDBSourceSplit), this.tunnelClient, new TunnelWorkerConfig(new TableStoreProcessor(tableStoreDBSourceSplit.getTableName(), tableStoreDBSourceSplit.getPrimaryKey(), collector)));
        try {
            tunnelWorker.connectAndWorking();
        } catch (Exception e) {
            log.error("Start OTS tunnel failed.", e);
            tunnelWorker.shutdown();
        }
    }

    public String getTunel(TableStoreDBSourceSplit tableStoreDBSourceSplit) {
        String tunnelId;
        deleteTunel(tableStoreDBSourceSplit);
        String str = tableStoreDBSourceSplit.getTableName() + "_migration2aws_tunnel4" + tableStoreDBSourceSplit.getSplitId();
        try {
            tunnelId = this.tunnelClient.describeTunnel(new DescribeTunnelRequest("test", str)).getTunnelInfo().getTunnelId();
        } catch (Exception e) {
            tunnelId = this.tunnelClient.createTunnel(new CreateTunnelRequest(tableStoreDBSourceSplit.getTableName(), str, TunnelType.valueOf("BaseAndStream"))).getTunnelId();
        }
        log.info("Tunnel found, Id: " + tunnelId);
        return tunnelId;
    }

    public void deleteTunel(TableStoreDBSourceSplit tableStoreDBSourceSplit) {
        String str = tableStoreDBSourceSplit.getTableName() + "_migration2aws_tunnel4" + tableStoreDBSourceSplit.getSplitId();
        try {
            log.info("Tunnel has been deleted: " + this.tunnelClient.deleteTunnel(new DeleteTunnelRequest(tableStoreDBSourceSplit.getTableName(), str)).toString());
        } catch (Exception e) {
            log.warn("Tunnel deletion failed due to not found: " + str);
        }
    }

    public List<TableStoreDBSourceSplit> snapshotState(long j) throws Exception {
        return new ArrayList(this.pendingSplits);
    }

    public void addSplits(List<TableStoreDBSourceSplit> list) {
        this.pendingSplits.addAll(list);
    }

    public void handleNoMoreSplits() {
        log.info("Reader [{}] received noMoreSplit event.", Integer.valueOf(this.context.getIndexOfSubtask()));
        this.noMoreSplit = true;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
    }
}
