package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.TestSnapshotQuerySplitterImpl;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/hudi/utilities/sources/TestHoodieIncrSource.class */
public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {
    private HoodieTestDataGenerator dataGen;
    private HoodieTableMetaClient metaClient;
    private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/utilities/sources/TestHoodieIncrSource$DummySchemaProvider.class */
    public static class DummySchemaProvider extends SchemaProvider {
        private final Schema schema;

        public DummySchemaProvider(Schema schema) {
            super(new TypedProperties());
            this.schema = schema;
        }

        public Schema getSourceSchema() {
            return this.schema;
        }
    }

    public SparkConf conf() {
        return conf(SparkClientFunctionalTestHarness.getSparkSqlConf());
    }

    @BeforeEach
    public void setUp() throws IOException {
        this.dataGen = new HoodieTestDataGenerator();
    }

    public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?> storageConfiguration, String str, Properties properties) throws IOException {
        return HoodieTableMetaClient.initTableAndGetMetaClient(storageConfiguration.newInstance(), str, HoodieTableMetaClient.withPropertyBuilder().setTableName("raw_trips").setTableType(this.tableType).setPayloadClass(HoodieAvroPayload.class).fromProperties(properties).build());
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testHoodieIncrSource(HoodieTableType hoodieTableType) throws IOException {
        this.tableType = hoodieTableType;
        this.metaClient = getHoodieMetaClient(storageConf(), basePath());
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getConfigBuilder(basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(4, 5).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(3).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build());
        Throwable th = null;
        try {
            try {
                writeRecords(hoodieWriteClient, WriteOperationType.INSERT, null, "100");
                writeRecords(hoodieWriteClient, WriteOperationType.INSERT, null, "200");
                writeRecords(hoodieWriteClient, WriteOperationType.INSERT, null, "300");
                writeRecords(hoodieWriteClient, WriteOperationType.INSERT, null, "400");
                Pair<String, List<HoodieRecord>> writeRecords = writeRecords(hoodieWriteClient, WriteOperationType.INSERT, null, "500");
                readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 500, (String) writeRecords.getKey());
                readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("100"), 400, (String) writeRecords.getKey());
                readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("400"), 100, (String) writeRecords.getKey());
                readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.empty(), 100, (String) writeRecords.getKey());
                readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(writeRecords.getKey()), 0, (String) writeRecords.getKey());
                readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(writeRecords.getKey()), 100, (String) writeRecords(hoodieWriteClient, WriteOperationType.INSERT, null, "600").getKey());
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testHoodieIncrSourceInflightCommitBeforeCompletedCommit(HoodieTableType hoodieTableType) throws IOException {
        this.tableType = hoodieTableType;
        this.metaClient = getHoodieMetaClient(storageConf(), basePath());
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getConfigBuilder(basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(4, 5).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(2).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(3).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build());
        Throwable th = null;
        try {
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 6; i++) {
                arrayList.add(writeRecords(hoodieWriteClient, WriteOperationType.INSERT, null, hoodieWriteClient.createNewInstantTime()));
            }
            HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
            HoodieInstant hoodieInstant = (HoodieInstant) activeTimeline.filter(hoodieInstant2 -> {
                return hoodieInstant2.getTimestamp().equals(((Pair) arrayList.get(4)).getKey());
            }).firstInstant().get();
            Option instantDetails = activeTimeline.getInstantDetails(hoodieInstant);
            activeTimeline.revertToInflight(hoodieInstant);
            this.metaClient.reloadActiveTimeline();
            readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 400, (String) ((Pair) arrayList.get(3)).getKey());
            readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of(((Pair) arrayList.get(0)).getKey()), 300, (String) ((Pair) arrayList.get(3)).getKey());
            readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of(((Pair) arrayList.get(2)).getKey()), 100, (String) ((Pair) arrayList.get(3)).getKey());
            readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.empty(), 100, (String) ((Pair) arrayList.get(3)).getKey());
            readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(((Pair) arrayList.get(3)).getKey()), 0, (String) ((Pair) arrayList.get(3)).getKey());
            activeTimeline.reload().saveAsComplete(new HoodieInstant(HoodieInstant.State.INFLIGHT, hoodieInstant.getAction(), (String) ((Pair) arrayList.get(4)).getKey()), instantDetails);
            readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(((Pair) arrayList.get(3)).getKey()), 200, (String) ((Pair) arrayList.get(5)).getKey());
            if (hoodieWriteClient != null) {
                if (0 == 0) {
                    hoodieWriteClient.close();
                    return;
                }
                try {
                    hoodieWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hoodieWriteClient != null) {
                if (0 != 0) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testHoodieIncrSourceWithPendingTableServices(HoodieTableType hoodieTableType) throws IOException {
        this.tableType = hoodieTableType;
        this.metaClient = getHoodieMetaClient(storageConf(), basePath());
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getConfigBuilder(basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 12).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withScheduleInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build());
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                int i = 0;
                while (i < 6) {
                    arrayList.add(writeRecords(hoodieWriteClient, i < 4 ? WriteOperationType.BULK_INSERT : WriteOperationType.UPSERT, i < 4 ? null : (List) ((Pair) arrayList.get(3)).getRight(), hoodieWriteClient.createNewInstantTime()));
                    if (hoodieTableType == HoodieTableType.COPY_ON_WRITE) {
                        if (i == 2) {
                            hoodieWriteClient.scheduleClustering(Option.empty());
                        }
                    } else if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
                        if (i == 4) {
                            hoodieWriteClient.scheduleCompaction(Option.empty());
                        }
                        if (i == 5) {
                            hoodieWriteClient.scheduleClustering(Option.empty());
                        }
                    }
                    i++;
                }
                arrayList.add(writeRecords(hoodieWriteClient, WriteOperationType.BULK_INSERT, null, hoodieWriteClient.createNewInstantTime()));
                String str = (String) ((Pair) arrayList.get(arrayList.size() - 1)).getKey();
                Option firstInstant = this.metaClient.getActiveTimeline().filterPendingReplaceTimeline().filter(hoodieInstant -> {
                    return ClusteringUtils.getClusteringPlan(this.metaClient, hoodieInstant).isPresent();
                }).firstInstant();
                Assertions.assertTrue(firstInstant.isPresent());
                Assertions.assertTrue(((HoodieInstant) firstInstant.get()).getTimestamp().compareTo(str) < 0);
                if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
                    Option firstInstant2 = this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
                    Assertions.assertTrue(firstInstant2.isPresent());
                    Assertions.assertTrue(((HoodieInstant) firstInstant2.get()).getTimestamp().compareTo(str) < 0);
                }
                readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 100, (String) ((Pair) arrayList.get(0)).getKey(), Option.of(TestSnapshotQuerySplitterImpl.class.getName()), new TypedProperties());
                readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 500, (String) ((Pair) arrayList.get(6)).getKey());
                readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of(((Pair) arrayList.get(2)).getKey()), 200, (String) ((Pair) arrayList.get(6)).getKey());
                readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.empty(), 100, (String) ((Pair) arrayList.get(6)).getKey());
                readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(((Pair) arrayList.get(6)).getKey()), 0, (String) ((Pair) arrayList.get(6)).getKey());
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testHoodieIncrSourceWithDataSourceOptions(HoodieTableType hoodieTableType) throws IOException {
        this.tableType = hoodieTableType;
        this.metaClient = getHoodieMetaClient(storageConf(), basePath());
        HoodieWriteConfig build = getConfigBuilder(basePath(), this.metaClient).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 12).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withScheduleInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).withColumnStatsIndexForColumns("_hoodie_commit_time").build()).build();
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty(HoodieIncrSourceConfig.HOODIE_INCREMENTAL_SPARK_DATASOURCE_OPTIONS.key(), "hoodie.metadata.enable=true,hoodie.enable.data.skipping=true");
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        Throwable th = null;
        try {
            try {
                Pair<String, List<HoodieRecord>> writeRecords = writeRecords(hoodieWriteClient, WriteOperationType.INSERT, null, "100");
                writeRecords(hoodieWriteClient, WriteOperationType.INSERT, null, "200");
                readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 100, (String) writeRecords.getKey(), Option.of(TestSnapshotQuerySplitterImpl.class.getName()), typedProperties);
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> option, int i, String str, Option<String> option2, TypedProperties typedProperties) {
        Properties properties = new Properties();
        properties.setProperty("hoodie.streamer.source.hoodieincr.path", basePath());
        properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name());
        properties.setProperty(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false");
        properties.putAll(typedProperties);
        option2.map(str2 -> {
            return properties.setProperty("hoodie.deltastreamer.snapshotload.query.splitter.class.name", str2);
        });
        Pair fetchNextBatch = new HoodieIncrSource(new TypedProperties(properties), jsc(), spark(), new DefaultStreamContext(new DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA), Option.empty())).fetchNextBatch(option, 500L);
        Assertions.assertNotNull(fetchNextBatch.getValue());
        if (i == 0) {
            Assertions.assertFalse(((Option) fetchNextBatch.getKey()).isPresent());
        } else {
            Assertions.assertEquals(i, ((Dataset) ((Option) fetchNextBatch.getKey()).get()).count());
        }
        Assertions.assertEquals(str, fetchNextBatch.getRight());
    }

    private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> option, int i, String str) {
        readAndAssert(missingCheckpointStrategy, option, i, str, Option.empty(), new TypedProperties());
    }

    private Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient sparkRDDWriteClient, WriteOperationType writeOperationType, List<HoodieRecord> list, String str) throws IOException {
        sparkRDDWriteClient.startCommitWithTime(str);
        List generateUpdates = writeOperationType == WriteOperationType.UPSERT ? this.dataGen.generateUpdates(str, list) : this.dataGen.generateInserts(str, 100);
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors((writeOperationType == WriteOperationType.BULK_INSERT ? sparkRDDWriteClient.bulkInsert(jsc().parallelize(generateUpdates, 1), str) : sparkRDDWriteClient.upsert(jsc().parallelize(generateUpdates, 1), str)).collect());
        return Pair.of(str, generateUpdates);
    }

    private HoodieWriteConfig.Builder getConfigBuilder(String str, HoodieTableMetaClient hoodieTableMetaClient) {
        return HoodieWriteConfig.newBuilder().withPath(str).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).forTable(hoodieTableMetaClient.getTableConfig().getTableName());
    }
}
