package org.apache.hudi.functional;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy;
import org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy;
import org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy;
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("functional")
/* loaded from: input_file:org/apache/hudi/functional/TestSparkConsistentBucketClustering.class */
public class TestSparkConsistentBucketClustering extends HoodieSparkClientTestHarness {
    private HoodieWriteConfig config;
    private HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0);

    public void setup(int i) throws IOException {
        setup(i, Collections.emptyMap());
    }

    public void setup(int i, Map<String, String> map) throws IOException {
        initPath();
        initSparkContexts();
        initTestDataGenerator();
        initFileSystem();
        Properties propertiesForKeyGen = getPropertiesForKeyGen(true);
        propertiesForKeyGen.putAll(map);
        propertiesForKeyGen.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
        this.metaClient = HoodieTestUtils.init(this.hadoopConf, this.basePath, HoodieTableType.MERGE_ON_READ, propertiesForKeyGen);
        this.config = getConfigBuilder().withProps(propertiesForKeyGen).withAutoCommit(false).withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(propertiesForKeyGen).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).withBucketNum("8").withBucketMaxNum(14).withBucketMinNum(4).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(i).build()).withClusteringConfig(HoodieClusteringConfig.newBuilder().withClusteringPlanStrategyClass(SparkConsistentBucketClusteringPlanStrategy.class.getName()).withClusteringExecutionStrategyClass(SparkConsistentBucketClusteringExecutionStrategy.class.getName()).withClusteringUpdatesStrategy(SparkConsistentBucketDuplicateUpdateStrategy.class.getName()).build()).build();
        this.writeClient = getHoodieWriteClient(this.config);
    }

    @AfterEach
    public void tearDown() throws IOException {
        cleanupResources();
    }

    @MethodSource({"configParams"})
    @ParameterizedTest
    public void testResizing(boolean z, boolean z2) throws IOException {
        int i = z ? 5120 : 134217728;
        int i2 = z ? 14 : 4;
        setup(i);
        this.config.setValue("hoodie.datasource.write.row.writer.enable", String.valueOf(z2));
        this.config.setValue("hoodie.metadata.enable", "false");
        writeData(this.writeClient.createNewInstantTime(), 2000, true);
        this.writeClient.cluster((String) this.writeClient.scheduleClustering(Option.empty()).get(), true);
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieSparkTable create = HoodieSparkTable.create(this.config, this.context, this.metaClient);
        Assertions.assertEquals(2000, readRecords().size());
        Arrays.stream(this.dataGen.getPartitionPaths()).forEach(str -> {
            Assertions.assertEquals(i2, ((HoodieConsistentHashingMetadata) ConsistentBucketIndexUtils.loadMetadata(create, str).get()).getNodes().size());
            create.getSliceView().getLatestFileSlices(str).forEach(fileSlice -> {
                Assertions.assertTrue(fileSlice.getBaseFile().isPresent());
                Assertions.assertTrue(fileSlice.getLogFiles().count() == 0);
            });
        });
    }

    @MethodSource({"configParams"})
    @ParameterizedTest
    public void testLoadMetadata(boolean z, boolean z2) throws IOException {
        setup(5120);
        this.writeClient.getConfig().setValue(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1");
        this.writeClient.getConfig().setValue(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "4");
        this.writeClient.getConfig().setValue(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "5");
        this.writeClient.getConfig().setValue("hoodie.datasource.write.row.writer.enable", String.valueOf(z2));
        writeData(this.writeClient.createNewInstantTime(), 2000, true);
        this.writeClient.cluster((String) this.writeClient.scheduleClustering(Option.empty()).get(), true);
        writeData(this.writeClient.createNewInstantTime(), 10, true);
        writeData(this.writeClient.createNewInstantTime(), 10, true);
        writeData(this.writeClient.createNewInstantTime(), 10, true);
        writeData(this.writeClient.createNewInstantTime(), 10, true);
        writeData(this.writeClient.createNewInstantTime(), 10, true);
        writeData(this.writeClient.createNewInstantTime(), 10, true);
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieSparkTable create = HoodieSparkTable.create(this.config, this.context, this.metaClient);
        this.writeClient.clean();
        new HoodieTimelineArchiver(this.writeClient.getConfig(), create).archiveIfRequired(this.context);
        Arrays.stream(this.dataGen.getPartitionPaths()).forEach(str -> {
            if (!z) {
                try {
                    Arrays.stream(create.getMetaClient().getFs().listStatus(FSUtils.getPartitionPath(create.getMetaClient().getHashingMetadataPath(), str))).forEach(fileStatus -> {
                        if (fileStatus.getPath().getName().contains(".commit")) {
                            try {
                                create.getMetaClient().getFs().delete(fileStatus.getPath());
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    });
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            Assertions.assertEquals(14, ((HoodieConsistentHashingMetadata) ConsistentBucketIndexUtils.loadMetadata(create, str).get()).getNodes().size());
        });
        writeData(this.writeClient.createNewInstantTime(), 10, true);
        writeData(this.writeClient.createNewInstantTime(), 10, true);
        Assertions.assertEquals(2080, readRecords().size());
    }

    @ValueSource(strings = {"_row_key", "begin_lat"})
    @ParameterizedTest
    public void testClusteringColumnSort(String str) throws IOException {
        Comparator comparing;
        HashMap hashMap = new HashMap();
        if (str.equals("_row_key")) {
            hashMap.put(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.PARTITION_SORT.toString());
        } else {
            hashMap.put(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(), str);
        }
        hashMap.put("hoodie.datasource.write.row.writer.enable", String.valueOf(false));
        setup(134217728, hashMap);
        writeData(this.writeClient.createNewInstantTime(), 500, true);
        writeData(this.writeClient.createNewInstantTime(), 500, true);
        this.writeClient.cluster((String) this.writeClient.scheduleClustering(Option.empty()).get(), true);
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        List<RecordReader> recordReadersUsingInputFormat = HoodieMergeOnReadTestUtils.getRecordReadersUsingInputFormat(this.hadoopConf, (List) Arrays.stream(this.dataGen.getPartitionPaths()).map(str2 -> {
            return Paths.get(this.basePath, str2).toString();
        }).collect(Collectors.toList()), this.basePath, new JobConf(this.hadoopConf), true, false);
        Schema.Field field = new Schema.Parser().parse("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").getField(str);
        if (field.schema().getType() == Schema.Type.DOUBLE) {
            comparing = Comparator.comparingDouble(obj -> {
                return ((DoubleWritable) obj).get();
            });
        } else {
            if (field.schema().getType() != Schema.Type.STRING) {
                throw new HoodieException("Cannot get comparator: unsupported data type, " + field.schema().getType());
            }
            comparing = Comparator.comparing((v0) -> {
                return v0.toString();
            }, (v0, v1) -> {
                return v0.compareTo(v1);
            });
        }
        for (RecordReader recordReader : recordReadersUsingInputFormat) {
            Object createKey = recordReader.createKey();
            ArrayWritable arrayWritable = (ArrayWritable) recordReader.createValue();
            Writable writable = null;
            while (true) {
                Writable writable2 = writable;
                if (recordReader.next(createKey, arrayWritable)) {
                    Writable writable3 = arrayWritable.get()[field.pos()];
                    Assertions.assertTrue(writable2 == null || comparing.compare(writable2, writable3) <= 0);
                    writable = writable3;
                }
            }
        }
    }

    @Test
    public void testConcurrentClustering() throws IOException {
        setup(5120);
        writeData(this.writeClient.createNewInstantTime(), 2000, true);
        String str = (String) this.writeClient.scheduleClustering(Option.empty()).get();
        Assertions.assertFalse(this.writeClient.scheduleClustering(Option.empty()).isPresent());
        this.writeClient.cluster(str, true);
        this.config.setValue(HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS, "1");
        this.config.setValue(HoodieClusteringConfig.PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, "0");
        this.config.setValue("hoodie.clustering.plan.partition.filter.mode", ClusteringPlanPartitionFilterMode.RECENT_DAYS.toString());
        Assertions.assertTrue(this.writeClient.scheduleClustering(Option.empty()).isPresent());
        this.config.setValue(HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS, "1");
        this.config.setValue(HoodieClusteringConfig.PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, "1");
        Assertions.assertTrue(this.writeClient.scheduleClustering(Option.empty()).isPresent());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testConcurrentWrite(boolean z) throws IOException {
        setup(5120);
        this.config.setValue("hoodie.datasource.write.row.writer.enable", String.valueOf(z));
        String createNewInstantTime = this.writeClient.createNewInstantTime();
        List<WriteStatus> writeData = writeData(createNewInstantTime, 2000, false);
        Assertions.assertFalse(this.writeClient.scheduleClustering(Option.empty()).isPresent());
        Assertions.assertTrue(this.writeClient.commitStats(createNewInstantTime, this.context.parallelize(writeData, 1), (List) writeData.stream().map((v0) -> {
            return v0.getStat();
        }).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType()));
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        String str = (String) this.writeClient.scheduleClustering(Option.empty()).get();
        writeData(this.writeClient.createNewInstantTime(), 2000, true);
        Assertions.assertEquals(4000, readRecords().size());
        this.writeClient.cluster(str, true);
        Assertions.assertEquals(4000, readRecords().size());
    }

    private List<Row> readRecords() {
        this.sparkSession.read().format("hudi").load(this.basePath + "/*/*/*/*").createOrReplaceTempView("hudi_ro_table");
        return this.sparkSession.sqlContext().sql("select * from hudi_ro_table").collectAsList();
    }

    private List<WriteStatus> writeData(String str, int i, boolean z) {
        JavaRDD parallelize = this.jsc.parallelize(this.dataGen.generateInserts(str, Integer.valueOf(i)), 2);
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        this.writeClient.startCommitWithTime(str);
        List<WriteStatus> collect = this.writeClient.upsert(parallelize, str).collect();
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        if (z) {
            Assertions.assertTrue(this.writeClient.commitStats(str, this.context.parallelize(collect, 1), (List) collect.stream().map((v0) -> {
                return v0.getStat();
            }).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType()));
        }
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        return collect;
    }

    public HoodieWriteConfig.Builder getConfigBuilder() {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withWriteStatusClass(MetadataMergeWriteStatus.class).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1048576L).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1048576L).parquetMaxFileSize(1048576L).build()).forTable("test-trip-table").withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
    }

    private static Stream<Arguments> configParams() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{true, false}), Arguments.of(new Object[]{false, false}), Arguments.of(new Object[]{true, true}), Arguments.of(new Object[]{false, true})});
    }
}
