package org.apache.hudi.utilities.sources;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.class */
public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarness {
    private ObjectMapper mapper = new ObjectMapper();

    @TempDir
    protected Path tempDir;

    @Mock
    CloudDataFetcher gcsObjectDataFetcher;

    @Mock
    QueryRunner queryRunner;
    protected Option<SchemaProvider> schemaProvider;
    private HoodieTableMetaClient metaClient;
    private JavaSparkContext jsc;
    private static final Schema GCS_METADATA_SCHEMA = SchemaTestUtil.getSchemaFromResource(TestGcsEventsHoodieIncrSource.class, "/streamer-config/gcs-metadata.avsc", true);
    private static final Logger LOG = LoggerFactory.getLogger(TestGcsEventsHoodieIncrSource.class);

    @BeforeEach
    public void setUp() throws IOException {
        this.metaClient = getHoodieMetaClient(hadoopConf(), basePath());
        this.jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
        String path = TestGcsEventsHoodieIncrSource.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.put("hoodie.deltastreamer.schemaprovider.source.schema.file", path);
        typedProperties.put("hoodie.deltastreamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName());
        this.schemaProvider = Option.of(new FilebasedSchemaProvider(typedProperties, this.jsc));
        MockitoAnnotations.initMocks(this);
    }

    public String basePath() {
        return this.tempDir.toAbsolutePath().toUri().toString();
    }

    @Test
    public void shouldNotFindNewDataIfCommitTimeOfWriteAndReadAreEqual() throws IOException {
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, (String) writeGcsMetadataRecords("1").getKey());
        ((CloudDataFetcher) Mockito.verify(this.gcsObjectDataFetcher, Mockito.times(0))).getCloudObjectDataDF((SparkSession) Mockito.any(), (List) Mockito.any(), (TypedProperties) Mockito.any(), (Option) ArgumentMatchers.eq(this.schemaProvider));
    }

    @Test
    public void shouldFetchDataIfCommitTimeForReadsLessThanForWrites() throws IOException {
        writeGcsMetadataRecords("2");
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of("path/to/file1.json", 100L, "1"));
        arrayList.add(Triple.of("path/to/file2.json", 150L, "1"));
        arrayList.add(Triple.of("path/to/file3.json", 200L, "1"));
        Mockito.when(this.queryRunner.run((QueryInfo) Mockito.any())).thenReturn(generateDataset(arrayList));
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, "1#path/to/file1.json");
    }

    @Test
    public void testTwoFilesAndContinueInSameCommit() throws IOException {
        writeGcsMetadataRecords("2");
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of("path/to/file1.json", 100L, "1"));
        arrayList.add(Triple.of("path/to/file2.json", 150L, "1"));
        arrayList.add(Triple.of("path/to/file3.json", 200L, "1"));
        Mockito.when(this.queryRunner.run((QueryInfo) Mockito.any())).thenReturn(generateDataset(arrayList));
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1"), 250L, "1#path/to/file2.json");
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 250L, "1#path/to/file3.json");
    }

    @Test
    public void largeBootstrapWithFilters() throws IOException {
        writeGcsMetadataRecords("2");
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i <= 10000; i++) {
            arrayList.add(Triple.of("path/to/file" + i + ".parquet", 100L, "1"));
        }
        arrayList.add(Triple.of("path/to/file10005.json", 100L, "1"));
        arrayList.add(Triple.of("path/to/file10006.json", 150L, "1"));
        arrayList.add(Triple.of("path/to/file10007.json", 200L, "1"));
        Mockito.when(this.queryRunner.run((QueryInfo) Mockito.any())).thenReturn(generateDataset(arrayList));
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1"), 250L, "1#path/to/file10006.json");
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file10006.json"), 250L, "1#path/to/file10007.json");
    }

    @Test
    public void testTwoFilesAndContinueAcrossCommits() throws IOException {
        writeGcsMetadataRecords("2");
        ArrayList arrayList = new ArrayList();
        arrayList.add(Triple.of("path/to/file1.json", 100L, "1"));
        arrayList.add(Triple.of("path/to/file3.json", 200L, "1"));
        arrayList.add(Triple.of("path/to/file2.json", 150L, "1"));
        arrayList.add(Triple.of("path/to/file4.json", 50L, "2"));
        arrayList.add(Triple.of("path/to/file5.json", 150L, "2"));
        Mockito.when(this.queryRunner.run((QueryInfo) Mockito.any())).thenReturn(generateDataset(arrayList));
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, "1#path/to/file1.json");
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 100L, "1#path/to/file2.json");
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 1000L, "2#path/to/file5.json");
        readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, "1#path/to/file1.json");
    }

    private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> option, long j, String str) {
        TypedProperties props = setProps(missingCheckpointStrategy);
        props.put("hoodie.deltastreamer.source.hoodieincr.file.format", "json");
        Pair fetchNextBatch = new GcsEventsHoodieIncrSource(props, jsc(), spark(), (SchemaProvider) this.schemaProvider.orElse((Object) null), new GcsObjectMetadataFetcher(props, "json"), this.gcsObjectDataFetcher, this.queryRunner).fetchNextBatch(option, j);
        String str2 = (String) fetchNextBatch.getRight();
        Assertions.assertNotNull(str2);
        Assertions.assertEquals(str, str2);
    }

    private HoodieRecord getGcsMetadataRecord(String str, String str2, String str3, String str4) {
        String str5 = "id:" + str3 + "/" + str2 + "/" + str4;
        String format = String.format("https://storage.googleapis.com/download/storage/v1/b/%s/o/%s?generation=%s&alt=media", str3, str2, str4);
        String format2 = String.format("https://www.googleapis.com/storage/v1/b/%s/o/%s", str3, str2);
        GenericData.Record record = new GenericData.Record(GCS_METADATA_SCHEMA);
        record.put("_row_key", str5);
        record.put("partition_path", str3);
        record.put("timestamp", Long.valueOf(Long.parseLong(str)));
        record.put("bucket", str3);
        record.put("contentLanguage", "en");
        record.put("contentType", "application/octet-stream");
        record.put("crc32c", "oRB3Aw==");
        record.put("etag", "CP7EwYCu6/kCEAE=");
        record.put("generation", str4);
        record.put("id", str5);
        record.put("kind", "storage#object");
        record.put("md5Hash", "McsS8FkcDSrB3cGfb18ysA==");
        record.put("mediaLink", format);
        record.put("metageneration", "1");
        record.put("name", str2);
        record.put("selfLink", format2);
        record.put("size", "370");
        record.put("storageClass", "STANDARD");
        record.put("timeCreated", "2022-08-29T05:52:55.869Z");
        record.put("timeStorageClassUpdated", "2022-08-29T05:52:55.869Z");
        record.put("updated", "2022-08-29T05:52:55.869Z");
        return new HoodieAvroRecord(new HoodieKey(str5, str3), new HoodieAvroPayload(Option.of(record)));
    }

    private HoodieWriteConfig getWriteConfig() {
        return getConfigBuilder(basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).build();
    }

    private Pair<String, List<HoodieRecord>> writeGcsMetadataRecords(String str) throws IOException {
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getWriteConfig());
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime(str);
                List asList = Arrays.asList(getGcsMetadataRecord(str, "data-file-1.json", "bucket-1", "1"), getGcsMetadataRecord(str, "data-file-2.json", "bucket-1", "1"), getGcsMetadataRecord(str, "data-file-3.json", "bucket-1", "1"), getGcsMetadataRecord(str, "data-file-4.json", "bucket-1", "1"));
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(jsc().parallelize(asList, 1), str).collect());
                Pair<String, List<HoodieRecord>> of = Pair.of(str, asList);
                if (hoodieWriteClient != null) {
                    if (0 != 0) {
                        try {
                            hoodieWriteClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        hoodieWriteClient.close();
                    }
                }
                return of;
            } finally {
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy) {
        Properties properties = new Properties();
        properties.put("hoodie.deltastreamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName());
        properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath());
        properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name());
        properties.setProperty("hoodie.deltastreamer.source.gcsincr.datafile.format", "json");
        return new TypedProperties(properties);
    }

    private HoodieWriteConfig.Builder getConfigBuilder(String str, HoodieTableMetaClient hoodieTableMetaClient) {
        return HoodieWriteConfig.newBuilder().withPath(str).withSchema(GCS_METADATA_SCHEMA.toString()).withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).forTable(hoodieTableMetaClient.getTableConfig().getTableName());
    }

    private String generateGCSEventMetadata(Long l, String str, String str2, String str3) throws JsonProcessingException {
        HashMap hashMap = new HashMap();
        hashMap.put("bucket", str);
        hashMap.put("name", str2);
        hashMap.put("size", l);
        hashMap.put("_hoodie_commit_time", str3);
        return this.mapper.writeValueAsString(hashMap);
    }

    private List<String> getSampleGCSObjectKeys(List<Triple<String, Long, String>> list) {
        return (List) list.stream().map(triple -> {
            try {
                return generateGCSEventMetadata((Long) triple.getMiddle(), "bucket-1", (String) triple.getLeft(), (String) triple.getRight());
            } catch (JsonProcessingException e) {
                throw new RuntimeException((Throwable) e);
            }
        }).collect(Collectors.toList());
    }

    private Dataset<Row> generateDataset(List<Triple<String, Long, String>> list) {
        return spark().read().json(this.jsc.parallelize(getSampleGCSObjectKeys(list), 2));
    }
}
