package org.apache.hudi.client;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.class */
public class TestMultiWriterWithPreferWriterIngestion extends HoodieClientTestBase {
    public void setUpMORTestTable() throws IOException {
        cleanupResources();
        initPath();
        initSparkContexts();
        initTestDataGenerator();
        initFileSystem();
        this.fs.mkdirs(new Path(this.basePath));
        this.metaClient = HoodieTestUtils.init(this.hadoopConf, this.basePath, HoodieTableType.MERGE_ON_READ, HoodieFileFormat.PARQUET);
        initTestDataGenerator();
    }

    @AfterEach
    public void clean() throws IOException {
        cleanupResources();
    }

    @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
    @ParameterizedTest
    public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType hoodieTableType) throws Exception {
        if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
            setUpMORTestTable();
        }
        Properties properties = new Properties();
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        HoodieWriteConfig build = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withAutoClean(false).withAsyncClean(true).withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).withEmbeddedTimelineServerEnabled(false).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.MEMORY).build()).withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).withConflictResolutionStrategy(new PreferWriterConflictResolutionStrategy()).build()).withAutoCommit(false).withProperties(properties).build();
        HashSet hashSet = new HashSet();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        createCommitWithInserts(build, hoodieWriteClient, "000", createNewInstantTime, 200);
        hashSet.add(createNewInstantTime);
        String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
        createCommitWithUpserts(build, hoodieWriteClient, createNewInstantTime, "000", createNewInstantTime2, 100);
        String createNewInstantTime3 = HoodieActiveTimeline.createNewInstantTime();
        createCommitWithUpserts(build, hoodieWriteClient, createNewInstantTime2, "000", createNewInstantTime3, 100);
        hashSet.add(createNewInstantTime2);
        hashSet.add(createNewInstantTime3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        SparkRDDWriteClient hoodieWriteClient2 = getHoodieWriteClient(build);
        SparkRDDWriteClient hoodieWriteClient3 = getHoodieWriteClient(build);
        String createNewInstantTime4 = HoodieActiveTimeline.createNewInstantTime();
        Future<?> submit = newFixedThreadPool.submit(() -> {
            try {
                createCommitWithUpserts(build, hoodieWriteClient2, createNewInstantTime3, createNewInstantTime2, createNewInstantTime4, 100);
                hashSet.add(createNewInstantTime4);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        String createNewInstantTime5 = HoodieActiveTimeline.createNewInstantTime();
        Future<?> submit2 = newFixedThreadPool.submit(() -> {
            try {
                hoodieWriteClient3.scheduleTableService(createNewInstantTime5, Option.empty(), TableServiceType.COMPACT);
            } catch (Exception e) {
                if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
                    throw new RuntimeException(e);
                }
            }
        });
        String createNewInstantTime6 = HoodieActiveTimeline.createNewInstantTime();
        Future<?> submit3 = newFixedThreadPool.submit(() -> {
            try {
                hoodieWriteClient3.scheduleTableService(createNewInstantTime6, Option.empty(), TableServiceType.CLEAN);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        submit.get();
        submit2.get();
        submit3.get();
        String createNewInstantTime7 = HoodieActiveTimeline.createNewInstantTime();
        Future<?> submit4 = newFixedThreadPool.submit(() -> {
            try {
                createCommitWithInserts(build, hoodieWriteClient2, createNewInstantTime3, createNewInstantTime7, 100);
                hashSet.add(createNewInstantTime7);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        Future<?> submit5 = newFixedThreadPool.submit(() -> {
            try {
                hoodieWriteClient3.commitCompaction(createNewInstantTime5, (HoodieCommitMetadata) hoodieWriteClient3.compact(createNewInstantTime5).getCommitMetadata().get(), Option.empty());
                hashSet.add(createNewInstantTime5);
            } catch (Exception e) {
                if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
                    Assertions.assertTrue(e instanceof HoodieWriteConflictException);
                }
            }
        });
        Future<?> submit6 = newFixedThreadPool.submit(() -> {
            try {
                hoodieWriteClient3.clean(createNewInstantTime6, false);
                hashSet.add(createNewInstantTime6);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        submit4.get();
        submit5.get();
        submit6.get();
        Assertions.assertTrue(hashSet.containsAll((Set) this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstantsAsStream().map((v0) -> {
            return v0.getTimestamp();
        }).collect(Collectors.toSet())));
    }

    @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
    @ParameterizedTest
    public void testHoodieClientMultiWriterWithClustering(HoodieTableType hoodieTableType) throws Exception {
        if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
            setUpMORTestTable();
        }
        Properties properties = new Properties();
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        HoodieWriteConfig build = getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()).withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).withConflictResolutionStrategy(new PreferWriterConflictResolutionStrategy()).build()).withAutoCommit(false).withProperties(properties).build();
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        createCommitWithInserts(build, getHoodieWriteClient(build), "000", createNewInstantTime, 200);
        updateBatch(build, getHoodieWriteClient(build), HoodieActiveTimeline.createNewInstantTime(), createNewInstantTime, Option.of(Arrays.asList(createNewInstantTime)), "000", 100, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, false, 100, 200, 2);
        String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        hoodieWriteClient.commit(createNewInstantTime2, updateBatch(build, hoodieWriteClient, createNewInstantTime2, createNewInstantTime, Option.of(Arrays.asList(createNewInstantTime)), "000", 100, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, false, 100, 200, 2));
        SparkRDDWriteClient hoodieWriteClient2 = getHoodieWriteClient(build);
        Option scheduleTableService = hoodieWriteClient2.scheduleTableService(Option.empty(), TableServiceType.CLUSTER);
        Assertions.assertThrows(HoodieClusteringException.class, () -> {
            hoodieWriteClient2.cluster((String) scheduleTableService.get(), true);
        });
    }

    private void createCommitWithInserts(HoodieWriteConfig hoodieWriteConfig, SparkRDDWriteClient sparkRDDWriteClient, String str, String str2, int i) throws Exception {
        Assertions.assertTrue(sparkRDDWriteClient.commit(str2, insertFirstBatch(hoodieWriteConfig, sparkRDDWriteClient, str2, str, i, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, false, i)), "Commit should succeed");
    }

    private void createCommitWithUpserts(HoodieWriteConfig hoodieWriteConfig, SparkRDDWriteClient sparkRDDWriteClient, String str, String str2, String str3, int i) throws Exception {
        sparkRDDWriteClient.commit(str3, updateBatch(hoodieWriteConfig, sparkRDDWriteClient, str3, str, Option.of(Arrays.asList(str2)), "000", i, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, false, i, 200, 2));
    }
}
