package org.apache.hudi.utilities.sources;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.S3EventsMetaSelector;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;

/* loaded from: input_file:org/apache/hudi/utilities/sources/S3EventsSource.class */
public class S3EventsSource extends RowSource implements Closeable {
    private final S3EventsMetaSelector pathSelector;
    private final SchemaProvider schemaProvider;
    private final List<Message> processedMessages;
    SqsClient sqs;

    public S3EventsSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
        this.processedMessages = new ArrayList();
        this.pathSelector = S3EventsMetaSelector.createSourceSelector(typedProperties);
        this.sqs = this.pathSelector.createAmazonSqsClient();
        this.schemaProvider = schemaProvider;
    }

    @Override // org.apache.hudi.utilities.sources.RowSource
    public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> option, long j) {
        Pair<List<String>, String> nextEventsFromQueue = this.pathSelector.getNextEventsFromQueue(this.sqs, option, this.processedMessages);
        if (((List) nextEventsFromQueue.getLeft()).isEmpty()) {
            return Pair.of(Option.empty(), nextEventsFromQueue.getRight());
        }
        Dataset createDataset = this.sparkSession.createDataset((List) nextEventsFromQueue.getLeft(), Encoders.STRING());
        StructType sourceSchema = UtilHelpers.getSourceSchema(this.schemaProvider);
        return sourceSchema != null ? Pair.of(Option.of(this.sparkSession.read().schema(sourceSchema).json(createDataset)), nextEventsFromQueue.getRight()) : Pair.of(Option.of(this.sparkSession.read().json(createDataset)), nextEventsFromQueue.getRight());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.sqs.close();
    }

    @Override // org.apache.hudi.utilities.callback.SourceCommitCallback
    public void onCommit(String str) {
        this.pathSelector.deleteProcessedMessages(this.sqs, this.pathSelector.queueUrl, this.processedMessages);
        this.processedMessages.clear();
    }
}
