package org.apache.hudi.utilities.sources;

import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.GCSEventsSourceConfig;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:org/apache/hudi/utilities/sources/TestGcsEventsSource.class */
public class TestGcsEventsSource extends UtilitiesTestBase {

    @Mock
    PubsubMessagesFetcher pubsubMessagesFetcher;
    protected FilebasedSchemaProvider schemaProvider;
    private TypedProperties props;
    private static final String CHECKPOINT_VALUE_ZERO = "0";

    @BeforeAll
    public static void beforeAll() throws Exception {
        UtilitiesTestBase.initTestServices();
    }

    @BeforeEach
    public void beforeEach() throws Exception {
        this.schemaProvider = new FilebasedSchemaProvider(UtilitiesTestBase.Helpers.setupSchemaOnDFS("streamer-config", "gcs-metadata.avsc"), jsc);
        MockitoAnnotations.initMocks(this);
        this.props = new TypedProperties();
        this.props.put(GCSEventsSourceConfig.GOOGLE_PROJECT_ID.key(), "dummy-project");
        this.props.put(GCSEventsSourceConfig.PUBSUB_SUBSCRIPTION_ID.key(), "dummy-subscription");
    }

    @Test
    public void shouldReturnEmptyOnNoMessages() {
        Mockito.when(this.pubsubMessagesFetcher.fetchMessages()).thenReturn(Collections.emptyList());
        Assertions.assertEquals(Pair.of(Option.empty(), CHECKPOINT_VALUE_ZERO), new GcsEventsSource(this.props, jsc, sparkSession, (SchemaProvider) null, this.pubsubMessagesFetcher).fetchNextBatch(Option.of(CHECKPOINT_VALUE_ZERO), 100L));
    }

    @Test
    public void shouldReturnDataOnValidMessages() {
        Mockito.when(this.pubsubMessagesFetcher.fetchMessages()).thenReturn(Arrays.asList(fileCreateMessage("objectId-1", "{\n  \"kind\": \"storage#object\",\n  \"id\": \"bucket-name/object-name/1234567890123456\",\n  \"selfLink\": \"https://www.googleapis.com/storage/v1/b/bucket-name/o/object-name\",\n  \"name\": \"object-name-1\",\n  \"bucket\": \"bucket-1\",\n  \"generation\": \"1234567890123456\",\n  \"metageneration\": \"1\",\n  \"contentType\": \"application/octet-stream\",\n  \"timeCreated\": \"2023-07-09T10:15:30.000Z\",\n  \"updated\": \"2023-07-09T10:15:30.000Z\",\n  \"size\": \"1024\",\n  \"md5Hash\": \"e4e68fb326b0d21a1bc7a12bb6b1e642\",\n  \"crc32c\": \"AAAAAAAAAAA=\",\n  \"etag\": \"CO2j+pDxx-ACEAE=\"\n}"), fileCreateMessage("objectId-2", "{\n  \"kind\": \"storage#object\",\n  \"id\": \"bucket-name/object-name/1234567890123456\",\n  \"selfLink\": \"https://www.googleapis.com/storage/v1/b/bucket-name/o/object-name\",\n  \"name\": \"object-name-2\",\n  \"bucket\": \"bucket-2\",\n  \"generation\": \"1234567890123456\",\n  \"metageneration\": \"1\",\n  \"contentType\": \"application/octet-stream\",\n  \"timeCreated\": \"2023-07-09T10:15:30.000Z\",\n  \"updated\": \"2023-07-09T10:15:30.000Z\",\n  \"size\": \"1024\",\n  \"md5Hash\": \"e4e68fb326b0d21a1bc7a12bb6b1e642\",\n  \"crc32c\": \"AAAAAAAAAAA=\",\n  \"etag\": \"CO2j+pDxx-ACEAE=\"\n}")));
        GcsEventsSource gcsEventsSource = new GcsEventsSource(this.props, jsc, sparkSession, this.schemaProvider, this.pubsubMessagesFetcher);
        Pair fetchNextBatch = gcsEventsSource.fetchNextBatch(Option.of(CHECKPOINT_VALUE_ZERO), 100L);
        gcsEventsSource.onCommit((String) fetchNextBatch.getRight());
        Assertions.assertEquals(CHECKPOINT_VALUE_ZERO, fetchNextBatch.getRight());
        List collectAsList = ((Dataset) ((Option) fetchNextBatch.getLeft()).get()).collectAsList();
        Assertions.assertEquals(((Row) collectAsList.get(0)).getAs("bucket"), "bucket-1");
        Assertions.assertEquals(((Row) collectAsList.get(1)).getAs("bucket"), "bucket-2");
        ((PubsubMessagesFetcher) Mockito.verify(this.pubsubMessagesFetcher)).fetchMessages();
    }

    @Test
    public void shouldFetchMessagesInBatches() {
        Mockito.when(this.pubsubMessagesFetcher.fetchMessages()).thenReturn(Arrays.asList(fileCreateMessage("objectId-1", "{\"data\":{\"bucket\":\"bucket-1\"}, \"size\": \"1024\"}"), fileCreateMessage("objectId-2", "{\"data\":{\"bucket\":\"bucket-2\"}, \"size\": \"1024\"}"))).thenReturn(Arrays.asList(fileCreateMessage("objectId-3", "{\"data\":{\"bucket\":\"bucket-3\"}, \"size\": \"1024\"}"), fileCreateMessage("objectId-4", "{\"data\":{\"bucket\":\"bucket-4\"}, \"size\": \"1024\"}")));
        GcsEventsSource gcsEventsSource = new GcsEventsSource(this.props, jsc, sparkSession, (SchemaProvider) null, this.pubsubMessagesFetcher);
        Pair fetchNextBatch = gcsEventsSource.fetchNextBatch(Option.of(CHECKPOINT_VALUE_ZERO), 100L);
        gcsEventsSource.onCommit((String) fetchNextBatch.getRight());
        Assertions.assertEquals(CHECKPOINT_VALUE_ZERO, fetchNextBatch.getRight());
        List collectAsList = ((Dataset) ((Option) fetchNextBatch.getLeft()).get()).collectAsList();
        assertBucket((Row) collectAsList.get(0), "bucket-1");
        assertBucket((Row) collectAsList.get(1), "bucket-2");
        Pair fetchNextBatch2 = gcsEventsSource.fetchNextBatch(Option.of(CHECKPOINT_VALUE_ZERO), 100L);
        gcsEventsSource.onCommit((String) fetchNextBatch2.getRight());
        List collectAsList2 = ((Dataset) ((Option) fetchNextBatch2.getLeft()).get()).collectAsList();
        assertBucket((Row) collectAsList2.get(0), "bucket-3");
        assertBucket((Row) collectAsList2.get(1), "bucket-4");
        ((PubsubMessagesFetcher) Mockito.verify(this.pubsubMessagesFetcher, Mockito.times(2))).fetchMessages();
    }

    @Test
    public void shouldSkipInvalidMessages1() {
        ReceivedMessage fileDeleteMessage = fileDeleteMessage("objectId-1", "{\"data\":{\"bucket\":\"bucket-1\"}, \"size\": \"1024\"}");
        ReceivedMessage fileCreateMessageWithOverwroteGen = fileCreateMessageWithOverwroteGen("objectId-2", "{\"data\":{\"bucket\":\"bucket-2\"}, \"size\": \"1024\"}");
        Mockito.when(this.pubsubMessagesFetcher.fetchMessages()).thenReturn(Arrays.asList(fileDeleteMessage, fileCreateMessage("objectId-3", "{\"data\":{\"bucket\":\"bucket-3\"}, \"size\": \"1024\"}"), fileCreateMessageWithOverwroteGen));
        GcsEventsSource gcsEventsSource = new GcsEventsSource(this.props, jsc, sparkSession, (SchemaProvider) null, this.pubsubMessagesFetcher);
        Pair fetchNextBatch = gcsEventsSource.fetchNextBatch(Option.of(CHECKPOINT_VALUE_ZERO), 100L);
        gcsEventsSource.onCommit((String) fetchNextBatch.getRight());
        Assertions.assertEquals(CHECKPOINT_VALUE_ZERO, fetchNextBatch.getRight());
        List collectAsList = ((Dataset) ((Option) fetchNextBatch.getLeft()).get()).collectAsList();
        Assertions.assertEquals(1, collectAsList.size());
        assertBucket((Row) collectAsList.get(0), "bucket-3");
        ((PubsubMessagesFetcher) Mockito.verify(this.pubsubMessagesFetcher)).fetchMessages();
    }

    @Test
    public void shouldGcsEventsSourceDoesNotDedupeInternally() {
        Mockito.when(this.pubsubMessagesFetcher.fetchMessages()).thenReturn(Arrays.asList(fileCreateMessage("objectId-1", "{\"data\":{\"bucket\":\"bucket-1\"}, \"size\": \"1024\"}"), fileCreateMessage("objectId-1", "{\"data\":{\"bucket\":\"bucket-1\"}, \"size\": \"1024\"}")));
        GcsEventsSource gcsEventsSource = new GcsEventsSource(this.props, jsc, sparkSession, (SchemaProvider) null, this.pubsubMessagesFetcher);
        Pair fetchNextBatch = gcsEventsSource.fetchNextBatch(Option.of(CHECKPOINT_VALUE_ZERO), 100L);
        gcsEventsSource.onCommit((String) fetchNextBatch.getRight());
        Assertions.assertEquals(CHECKPOINT_VALUE_ZERO, fetchNextBatch.getRight());
        List collectAsList = ((Dataset) ((Option) fetchNextBatch.getLeft()).get()).collectAsList();
        Assertions.assertEquals(2, collectAsList.size());
        assertBucket((Row) collectAsList.get(0), "bucket-1");
        assertBucket((Row) collectAsList.get(1), "bucket-1");
        ((PubsubMessagesFetcher) Mockito.verify(this.pubsubMessagesFetcher)).fetchMessages();
    }

    private ReceivedMessage fileCreateMessageWithOverwroteGen(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put("overwroteGeneration", "objectId-N");
        return ReceivedMessage.newBuilder().setMessage(objectWithEventTypeAndAttrs(str, "OBJECT_FINALIZE", hashMap, str2)).setAckId(str).build();
    }

    private ReceivedMessage fileCreateMessage(String str, String str2) {
        return ReceivedMessage.newBuilder().setMessage(objectFinalizeMessage(str, str2)).setAckId(str).build();
    }

    private ReceivedMessage fileDeleteMessage(String str, String str2) {
        return ReceivedMessage.newBuilder().setMessage(objectDeleteMessage(str, str2)).setAckId(str).build();
    }

    private PubsubMessage.Builder objectFinalizeMessage(String str, String str2) {
        return objectWithEventType(str, "OBJECT_FINALIZE", str2);
    }

    private PubsubMessage.Builder objectDeleteMessage(String str, String str2) {
        return objectWithEventType(str, "OBJECT_DELETE", str2);
    }

    private PubsubMessage.Builder objectWithEventType(String str, String str2, String str3) {
        return messageWithAttrs(createBasicAttrs(str, str2), str3);
    }

    private PubsubMessage.Builder objectWithEventTypeAndAttrs(String str, String str2, Map<String, String> map, String str3) {
        Map<String, String> createBasicAttrs = createBasicAttrs(str, str2);
        createBasicAttrs.putAll(map);
        return messageWithAttrs(createBasicAttrs, str3);
    }

    private Map<String, String> createBasicAttrs(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put("objectId", str);
        hashMap.put("eventType", str2);
        return hashMap;
    }

    private PubsubMessage.Builder messageWithAttrs(Map<String, String> map, String str) {
        return PubsubMessage.newBuilder().putAllAttributes(new HashMap(map)).setData(ByteString.copyFrom(StringUtils.getUTF8Bytes(str)));
    }

    private void assertBucket(Row row, String str) {
        Assertions.assertEquals(str, (String) ((Row) row.getAs("data")).getAs("bucket"));
    }
}
