package org.apache.hudi.utilities.sources;

import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.config.GCSEventsSourceConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.gcs.MessageBatch;
import org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity;
import org.apache.hudi.utilities.sources.helpers.gcs.MetadataMessage;
import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/sources/GcsEventsSource.class */
public class GcsEventsSource extends RowSource {
    private final PubsubMessagesFetcher pubsubMessagesFetcher;
    private final SchemaProvider schemaProvider;
    private final boolean ackMessages;
    private final List<String> messagesToAck;
    private static final String CHECKPOINT_VALUE_ZERO = "0";
    private static final Logger LOG = LoggerFactory.getLogger(GcsEventsSource.class);

    public GcsEventsSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        this(typedProperties, javaSparkContext, sparkSession, schemaProvider, new PubsubMessagesFetcher(ConfigUtils.getStringWithAltKeys(typedProperties, GCSEventsSourceConfig.GOOGLE_PROJECT_ID), ConfigUtils.getStringWithAltKeys(typedProperties, GCSEventsSourceConfig.PUBSUB_SUBSCRIPTION_ID), ConfigUtils.getIntWithAltKeys(typedProperties, CloudSourceConfig.BATCH_SIZE_CONF), ConfigUtils.getIntWithAltKeys(typedProperties, CloudSourceConfig.MAX_NUM_MESSAGES_PER_SYNC), ConfigUtils.getIntWithAltKeys(typedProperties, CloudSourceConfig.MAX_FETCH_TIME_PER_SYNC_SECS)));
    }

    public GcsEventsSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, PubsubMessagesFetcher pubsubMessagesFetcher) {
        super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
        this.messagesToAck = new ArrayList();
        this.pubsubMessagesFetcher = pubsubMessagesFetcher;
        this.ackMessages = ConfigUtils.getBooleanWithAltKeys(typedProperties, CloudSourceConfig.ACK_MESSAGES);
        this.schemaProvider = schemaProvider;
        LOG.info("Created GcsEventsSource");
    }

    @Override // org.apache.hudi.utilities.sources.RowSource
    protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> option, long j) {
        LOG.info("fetchNextBatch(): Input checkpoint: " + option);
        try {
            MessageBatch fetchFileMetadata = fetchFileMetadata();
            if (fetchFileMetadata.isEmpty()) {
                LOG.info("No new data. Returning empty batch with checkpoint value: 0");
                return Pair.of(Option.empty(), CHECKPOINT_VALUE_ZERO);
            }
            Dataset createDataset = this.sparkSession.createDataset(fetchFileMetadata.getMessages(), Encoders.STRING());
            LOG.info("Returning checkpoint value: 0");
            StructType sourceSchema = UtilHelpers.getSourceSchema(this.schemaProvider);
            return sourceSchema != null ? Pair.of(Option.of(this.sparkSession.read().schema(sourceSchema).json(createDataset)), CHECKPOINT_VALUE_ZERO) : Pair.of(Option.of(this.sparkSession.read().json(createDataset)), CHECKPOINT_VALUE_ZERO);
        } catch (HoodieException e) {
            throw e;
        } catch (Exception e2) {
            throw new HoodieReadFromSourceException("Failed to fetch file metadata from GCS events source", e2);
        }
    }

    @Override // org.apache.hudi.utilities.callback.SourceCommitCallback
    public void onCommit(String str) {
        LOG.info("onCommit(): Checkpoint: " + str);
        if (this.ackMessages) {
            ackOutstandingMessages();
        } else {
            LOG.warn("Not acknowledging messages. Can result in repeated redeliveries.");
        }
    }

    MessageBatch fetchFileMetadata() {
        return processMessages(this.pubsubMessagesFetcher.fetchMessages());
    }

    private MessageBatch processMessages(List<ReceivedMessage> list) {
        ArrayList arrayList = new ArrayList();
        for (ReceivedMessage receivedMessage : list) {
            MetadataMessage metadataMessage = new MetadataMessage(receivedMessage.getMessage());
            String stringUtf8 = metadataMessage.toStringUtf8();
            logDetails(metadataMessage, stringUtf8);
            this.messagesToAck.add(receivedMessage.getAckId());
            MessageValidity shouldBeProcessed = metadataMessage.shouldBeProcessed();
            if (shouldBeProcessed.getDecision() == MessageValidity.ProcessingDecision.DO_SKIP) {
                LOG.info("Skipping message: " + shouldBeProcessed.getDescription());
            } else {
                arrayList.add(stringUtf8);
            }
        }
        return new MessageBatch(arrayList);
    }

    private void ackOutstandingMessages() {
        if (this.messagesToAck.isEmpty()) {
            return;
        }
        try {
            this.pubsubMessagesFetcher.sendAcks(this.messagesToAck);
            this.messagesToAck.clear();
        } catch (IOException e) {
            throw new HoodieReadFromSourceException("Error when acknowledging messages from Pubsub", e);
        }
    }

    private void logDetails(MetadataMessage metadataMessage, String str) {
        LOG.info("eventType: " + metadataMessage.getEventType() + ", objectId: " + metadataMessage.getObjectId());
        if (LOG.isDebugEnabled()) {
            LOG.debug("msg: " + str);
        }
    }
}
