package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer;
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.paginators.ScanIterable;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.class */
public class AmazonDynamoDBSourceReader implements SourceReader<SeaTunnelRow, AmazonDynamoDBSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(AmazonDynamoDBSourceReader.class);
    protected DynamoDbClient dynamoDbClient;
    protected SourceReader.Context context;
    protected AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
    protected SeaTunnelRowDeserializer seaTunnelRowDeserializer;
    Queue<AmazonDynamoDBSourceSplit> pendingSplits = new ConcurrentLinkedDeque();
    private volatile boolean noMoreSplit;

    public AmazonDynamoDBSourceReader(SourceReader.Context context, AmazonDynamoDBSourceOptions amazonDynamoDBSourceOptions, SeaTunnelRowType seaTunnelRowType) {
        this.context = context;
        this.amazondynamodbSourceOptions = amazonDynamoDBSourceOptions;
        this.seaTunnelRowDeserializer = new DefaultSeaTunnelRowDeserializer(seaTunnelRowType);
    }

    public void open() {
        this.dynamoDbClient = ((DynamoDbClientBuilder) ((DynamoDbClientBuilder) ((DynamoDbClientBuilder) DynamoDbClient.builder().endpointOverride(URI.create(this.amazondynamodbSourceOptions.getUrl()))).region(Region.of(this.amazondynamodbSourceOptions.getRegion()))).credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(this.amazondynamodbSourceOptions.getAccessKeyId(), this.amazondynamodbSourceOptions.getSecretAccessKey())))).mo902build();
    }

    public void close() {
        this.dynamoDbClient.close();
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException {
        synchronized (collector.getCheckpointLock()) {
            AmazonDynamoDBSourceSplit poll = this.pendingSplits.poll();
            if (poll == null) {
                log.info("AmazonDynamoDB Source Reader [{}] waiting for splits", Integer.valueOf(this.context.getIndexOfSubtask()));
                if (this.noMoreSplit) {
                    log.info("Closed the bounded amazonDynamodb source");
                    this.context.signalNoMoreElement();
                    Thread.sleep(2000L);
                }
            }
            if (Objects.nonNull(poll)) {
                read(poll, collector);
            }
        }
    }

    public List<AmazonDynamoDBSourceSplit> snapshotState(long j) {
        return new ArrayList(this.pendingSplits);
    }

    public void addSplits(List<AmazonDynamoDBSourceSplit> list) {
        this.pendingSplits.addAll(list);
    }

    public void handleNoMoreSplits() {
        log.info("Reader [{}] received noMoreSplit event.", Integer.valueOf(this.context.getIndexOfSubtask()));
        this.noMoreSplit = true;
    }

    private void read(AmazonDynamoDBSourceSplit amazonDynamoDBSourceSplit, Collector<SeaTunnelRow> collector) {
        ScanIterable scanPaginator = this.dynamoDbClient.scanPaginator((ScanRequest) ScanRequest.builder().tableName(this.amazondynamodbSourceOptions.getTable()).limit(amazonDynamoDBSourceSplit.getItemCount()).segment(amazonDynamoDBSourceSplit.getSplitId()).totalSegments(amazonDynamoDBSourceSplit.getTotalSegments()).mo902build());
        do {
            scanPaginator.items().forEach(map -> {
                collector.collect(this.seaTunnelRowDeserializer.deserialize(map));
            });
            if (!scanPaginator.iterator().hasNext()) {
                return;
            }
        } while (!this.noMoreSplit);
    }

    public void notifyCheckpointComplete(long j) {
    }
}
