package org.apache.seatunnel.connectors.seatunnel.amazonsqs.source;

import java.io.IOException;
import java.net.URI;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.amazonsqs.deserialize.AmazonSqsDeserializer;
import org.apache.seatunnel.connectors.seatunnel.amazonsqs.deserialize.SeaTunnelRowDeserializer;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsClientBuilder;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSourceReader.class */
public class AmazonSqsSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
    private static final Logger log = LoggerFactory.getLogger(AmazonSqsSourceReader.class);
    protected SqsClient sqsClient;
    protected SingleSplitReaderContext context;
    protected AmazonSqsSourceOptions amazonSqsSourceOptions;
    private final SeaTunnelRowDeserializer seaTunnelRowDeserializer;

    public AmazonSqsSourceReader(SingleSplitReaderContext singleSplitReaderContext, AmazonSqsSourceOptions amazonSqsSourceOptions, DeserializationSchema<SeaTunnelRow> deserializationSchema, SeaTunnelRowType seaTunnelRowType) {
        this.context = singleSplitReaderContext;
        this.amazonSqsSourceOptions = amazonSqsSourceOptions;
        this.seaTunnelRowDeserializer = new AmazonSqsDeserializer(deserializationSchema);
    }

    public void open() throws Exception {
        if ((this.amazonSqsSourceOptions.getAccessKeyId() != null) && (this.amazonSqsSourceOptions.getSecretAccessKey() != null)) {
            this.sqsClient = ((SqsClientBuilder) ((SqsClientBuilder) ((SqsClientBuilder) SqsClient.builder().endpointOverride(URI.create(this.amazonSqsSourceOptions.getUrl()))).region(Region.of(this.amazonSqsSourceOptions.getRegion()))).credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(this.amazonSqsSourceOptions.getAccessKeyId(), this.amazonSqsSourceOptions.getSecretAccessKey())))).mo928build();
        } else {
            this.sqsClient = ((SqsClientBuilder) ((SqsClientBuilder) ((SqsClientBuilder) SqsClient.builder().endpointOverride(URI.create(this.amazonSqsSourceOptions.getUrl()))).region(Region.of(this.amazonSqsSourceOptions.getRegion()))).credentialsProvider(DefaultCredentialsProvider.create())).mo928build();
        }
    }

    public void close() throws IOException {
        this.sqsClient.close();
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader
    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        for (Message message : this.sqsClient.receiveMessage((ReceiveMessageRequest) ReceiveMessageRequest.builder().queueUrl(this.amazonSqsSourceOptions.getUrl()).maxNumberOfMessages(10).waitTimeSeconds(10).mo928build()).messages()) {
            collector.collect(this.seaTunnelRowDeserializer.deserializeRow(message.body()));
            if (this.amazonSqsSourceOptions.isDeleteMessage()) {
                this.sqsClient.deleteMessage((DeleteMessageRequest) DeleteMessageRequest.builder().queueUrl(this.amazonSqsSourceOptions.getUrl()).receiptHandle(message.receiptHandle()).mo928build());
            }
        }
        this.context.signalNoMoreElement();
    }
}
