package org.apache.hudi.client.functional;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@Tag("functional")
/* loaded from: input_file:org/apache/hudi/client/functional/TestConsistentBucketIndex.class */
public class TestConsistentBucketIndex extends HoodieSparkClientTestHarness {
    private final Random random = new Random(1);
    private HoodieIndex index;
    private HoodieWriteConfig config;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hudi.client.functional.TestConsistentBucketIndex$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/client/functional/TestConsistentBucketIndex$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$common$model$WriteOperationType = new int[WriteOperationType.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.UPSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.BULK_INSERT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    private static Stream<Arguments> configParams() {
        return Stream.of(new Object[]{true, false}, new Object[]{false, false}, new Object[]{true, true}, new Object[]{false, true}).map(Arguments::of);
    }

    private void setUp(boolean z, boolean z2) throws Exception {
        initPath();
        initSparkContexts();
        if (z2) {
            initTestDataGenerator();
        } else {
            initTestDataGenerator(new String[]{""});
        }
        initFileSystem();
        Properties propertiesForKeyGen = getPropertiesForKeyGen(z);
        propertiesForKeyGen.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
        this.metaClient = HoodieTestUtils.init(this.hadoopConf, this.basePath, HoodieTableType.MERGE_ON_READ, propertiesForKeyGen);
        this.config = getConfigBuilder().withProperties(propertiesForKeyGen).withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(propertiesForKeyGen).withIndexType(HoodieIndex.IndexType.BUCKET).withIndexKeyField("_row_key").withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).withBucketNum("8").build()).withAutoCommit(false).build();
        this.writeClient = getHoodieWriteClient(this.config);
        this.index = this.writeClient.getIndex();
    }

    @AfterEach
    public void tearDown() throws IOException {
        cleanupResources();
    }

    @MethodSource({"configParams"})
    @ParameterizedTest
    public void testTagLocation(boolean z, boolean z2) throws Exception {
        setUp(z, z2);
        JavaRDD<HoodieRecord> parallelize = this.jsc.parallelize(this.dataGen.generateInserts("001", Integer.valueOf(20 + this.random.nextInt(20))), 2);
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        HoodieSparkTable create = HoodieSparkTable.create(this.config, this.context, this.metaClient);
        List<HoodieRecord> collect = tagLocation(this.index, parallelize, create).collect();
        Assertions.assertTrue(collect.stream().allMatch(hoodieRecord -> {
            return hoodieRecord.isCurrentLocationKnown();
        }));
        List collect2 = tagLocation(this.index, parallelize, create).collect();
        for (HoodieRecord hoodieRecord2 : collect) {
            Iterator it = collect2.iterator();
            while (true) {
                if (it.hasNext()) {
                    HoodieRecord hoodieRecord3 = (HoodieRecord) it.next();
                    if (hoodieRecord2.getRecordKey().equals(hoodieRecord3.getRecordKey())) {
                        Assertions.assertEquals(hoodieRecord2.getCurrentLocation(), hoodieRecord3.getCurrentLocation());
                        break;
                    }
                }
            }
        }
    }

    @MethodSource({"configParams"})
    @ParameterizedTest
    public void testWriteData(boolean z, boolean z2) throws Exception {
        setUp(z, z2);
        int nextInt = 20 + this.random.nextInt(20);
        JavaRDD<HoodieRecord> parallelize = this.jsc.parallelize(this.dataGen.generateInserts("001", Integer.valueOf(nextInt)), 2);
        List<WriteStatus> writeData = writeData(parallelize, "001", WriteOperationType.UPSERT, true);
        Assertions.assertEquals(writeData.stream().map((v0) -> {
            return v0.getFileId();
        }).distinct().count(), Arrays.stream(this.dataGen.getPartitionPaths()).mapToInt(str -> {
            return ((FileStatus[]) Objects.requireNonNull(listStatus(str, true))).length;
        }).sum());
        Assertions.assertEquals(nextInt, readRecordsNum(this.dataGen.getPartitionPaths(), z));
        writeData(parallelize, "002", WriteOperationType.UPSERT, true);
        Assertions.assertEquals(writeData.stream().map((v0) -> {
            return v0.getFileId();
        }).distinct().count() * 2, Arrays.stream(this.dataGen.getPartitionPaths()).mapToInt(str2 -> {
            return Arrays.stream(listStatus(str2, true)).mapToInt(fileStatus -> {
                if (fileStatus instanceof RealtimeFileStatus) {
                    return ((RealtimeFileStatus) fileStatus).getDeltaLogFiles().size();
                }
                return 1;
            }).sum();
        }).sum());
        Assertions.assertEquals(nextInt, readRecordsNum(this.dataGen.getPartitionPaths(), z));
        writeData("003", nextInt, true);
        Assertions.assertEquals(nextInt * 2, readRecordsNum(this.dataGen.getPartitionPaths(), z));
    }

    @MethodSource({"configParams"})
    @ParameterizedTest
    public void testWriteDataWithCompaction(boolean z, boolean z2) throws Exception {
        setUp(z, z2);
        writeData(HoodieActiveTimeline.createNewInstantTime(), 200, true);
        this.config.setValue(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS, "1");
        this.config.setValue(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY, CompactionTriggerStrategy.NUM_COMMITS.name());
        String str = (String) this.writeClient.scheduleCompaction(Option.empty()).get();
        Assertions.assertEquals(200, readRecordsNum(this.dataGen.getPartitionPaths(), z));
        writeData(HoodieActiveTimeline.createNewInstantTime(), 200, true);
        Assertions.assertEquals(400, readRecordsNum(this.dataGen.getPartitionPaths(), z));
        this.writeClient.commitCompaction(str, (HoodieCommitMetadata) this.writeClient.compact(str).getCommitMetadata().get(), Option.empty());
        Assertions.assertEquals(400, readRecordsNum(this.dataGen.getPartitionPaths(), z));
    }

    @MethodSource({"configParams"})
    @ParameterizedTest
    public void testBulkInsertData(boolean z, boolean z2) throws Exception {
        setUp(z, z2);
        int nextInt = 20 + this.random.nextInt(20);
        JavaRDD<HoodieRecord> parallelize = this.jsc.parallelize(this.dataGen.generateInserts("001", Integer.valueOf(nextInt)), 2);
        long count = writeData(parallelize, "001", WriteOperationType.BULK_INSERT, true).stream().map((v0) -> {
            return v0.getFileId();
        }).distinct().count();
        Assertions.assertEquals(count, Arrays.stream(this.dataGen.getPartitionPaths()).mapToInt(str -> {
            return ((FileStatus[]) Objects.requireNonNull(listStatus(str, true))).length;
        }).sum());
        writeData(parallelize, "002", WriteOperationType.UPSERT, true);
        Assertions.assertEquals(count, Arrays.stream(this.dataGen.getPartitionPaths()).mapToInt(str2 -> {
            return ((FileStatus[]) Objects.requireNonNull(listStatus(str2, true))).length;
        }).sum());
        Assertions.assertEquals(count, Arrays.stream(this.dataGen.getPartitionPaths()).mapToInt(str3 -> {
            return Arrays.stream(listStatus(str3, true)).mapToInt(fileStatus -> {
                if (fileStatus instanceof RealtimeFileStatus) {
                    return ((RealtimeFileStatus) fileStatus).getDeltaLogFiles().size();
                }
                return 1;
            }).sum();
        }).sum());
        this.hadoopConf.set("hoodie.realtime.merge.skip", "true");
        Assertions.assertEquals(nextInt * 2, readRecordsNum(this.dataGen.getPartitionPaths(), z));
    }

    private int readRecordsNum(String[] strArr, boolean z) {
        return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(this.hadoopConf, (List) Arrays.stream(strArr).map(str -> {
            return Paths.get(this.basePath, str).toString();
        }).collect(Collectors.toList()), this.basePath, new JobConf(this.hadoopConf), true, z).size();
    }

    private List<WriteStatus> writeData(String str, int i, boolean z) {
        return writeData(this.jsc.parallelize(this.dataGen.generateInserts(str, Integer.valueOf(i)), 2), str, WriteOperationType.UPSERT, z);
    }

    private List<WriteStatus> writeData(JavaRDD<HoodieRecord> javaRDD, String str, WriteOperationType writeOperationType, boolean z) {
        List<WriteStatus> collect;
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        this.writeClient.startCommitWithTime(str);
        switch (AnonymousClass1.$SwitchMap$org$apache$hudi$common$model$WriteOperationType[writeOperationType.ordinal()]) {
            case 1:
                collect = this.writeClient.upsert(javaRDD, str).collect();
                break;
            case 2:
                collect = this.writeClient.bulkInsert(javaRDD, str).collect();
                break;
            default:
                throw new HoodieException("Unsupported write operations: " + writeOperationType);
        }
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(collect);
        if (z) {
            Assertions.assertTrue(this.writeClient.commitStats(str, this.context.parallelize(collect, 1), (List) collect.stream().map((v0) -> {
                return v0.getStat();
            }).collect(Collectors.toList()), Option.empty(), this.metaClient.getCommitActionType()));
        }
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        return collect;
    }

    private FileStatus[] listStatus(String str, boolean z) {
        JobConf jobConf = new JobConf(this.hadoopConf);
        FileInputFormat.setInputPaths(jobConf, Paths.get(this.basePath, str).toString());
        HoodieParquetRealtimeInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(HoodieFileFormat.PARQUET, z, jobConf);
        try {
            return z ? inputFormat.listStatus(jobConf) : ((HoodieParquetInputFormat) inputFormat).listStatus(jobConf);
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }

    public HoodieWriteConfig.Builder getConfigBuilder() {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withWriteStatusClass(MetadataMergeWriteStatus.class).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1048576L).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1048576L).parquetMaxFileSize(1048576L).build()).forTable("test-trip-table").withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
    }
}
