package org.apache.hudi.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/hudi/client/TestHoodieClientMultiWriter.class */
public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
    public void setUpMORTestTable() throws IOException {
        cleanupResources();
        initPath();
        initSparkContexts();
        initTestDataGenerator();
        initFileSystem();
        this.fs.mkdirs(new Path(this.basePath));
        this.metaClient = HoodieTestUtils.init(this.hadoopConf, this.basePath, HoodieTableType.MERGE_ON_READ, HoodieFileFormat.PARQUET);
        initTestDataGenerator();
    }

    @AfterEach
    public void clean() throws IOException {
        cleanupResources();
    }

    @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
    @ParameterizedTest
    public void testHoodieClientBasicMultiWriter(HoodieTableType hoodieTableType) throws Exception {
        if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
            setUpMORTestTable();
        }
        Properties properties = new Properties();
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        properties.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        HoodieWriteConfig build = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withEmbeddedTimelineServerEnabled(false).withMarkersType(MarkerType.DIRECT.name()).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).withAutoCommit(false).withProperties(properties).build();
        createCommitWithInserts(build, getHoodieWriteClient(build), "000", "001", 200);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        SparkRDDWriteClient hoodieWriteClient2 = getHoodieWriteClient(build);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        Future<?> submit = newFixedThreadPool.submit(() -> {
            try {
                JavaRDD<WriteStatus> startCommitForUpdate = startCommitForUpdate(build, hoodieWriteClient, "002", 100);
                cyclicBarrier.await(60L, TimeUnit.SECONDS);
                Assertions.assertDoesNotThrow(() -> {
                    hoodieWriteClient.commit("002", startCommitForUpdate);
                });
                cyclicBarrier.await(60L, TimeUnit.SECONDS);
                atomicBoolean.set(true);
            } catch (Exception e) {
                atomicBoolean.set(false);
            }
        });
        Future<?> submit2 = newFixedThreadPool.submit(() -> {
            try {
                cyclicBarrier.await(60L, TimeUnit.SECONDS);
                JavaRDD<WriteStatus> startCommitForUpdate = startCommitForUpdate(build, hoodieWriteClient2, "003", 100);
                cyclicBarrier.await(60L, TimeUnit.SECONDS);
                Assertions.assertThrows(HoodieWriteConflictException.class, () -> {
                    hoodieWriteClient2.commit("003", startCommitForUpdate);
                });
                atomicBoolean2.set(true);
            } catch (Exception e) {
                atomicBoolean2.set(false);
            }
        });
        submit.get();
        submit2.get();
        Assertions.assertTrue(atomicBoolean.get() && atomicBoolean2.get());
    }

    @Disabled
    public void testMultiWriterWithAsyncTableServicesWithConflictCOW() throws Exception {
        testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.COPY_ON_WRITE);
    }

    @Test
    public void testMultiWriterWithAsyncTableServicesWithConflictMOR() throws Exception {
        testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.MERGE_ON_READ);
    }

    @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
    @ParameterizedTest
    public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType hoodieTableType) throws Exception {
        if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
            setUpMORTestTable();
        }
        Properties properties = new Properties();
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        properties.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        properties.setProperty("hoodie.write.lock.client.wait_time_ms_between_retry", "3000");
        properties.setProperty("hoodie.write.lock.client.num_retries", "20");
        HoodieWriteConfig build = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withMaxNumDeltaCommitsBeforeCompaction(2).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).withAutoCommit(false).withMarkersType(MarkerType.DIRECT.name()).withProperties(properties).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST).withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build()).build();
        createCommitWithInsertsForPartition(build, getHoodieWriteClient(build), "000", "001", 100, "2016/03/01");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        ArrayList arrayList = new ArrayList(5);
        for (int i = 0; i < 5; i++) {
            String str = "00" + (i + 2);
            String str2 = "2016/03/0" + (i + 2);
            arrayList.add(newFixedThreadPool.submit(() -> {
                try {
                    createCommitWithInsertsForPartition(build, getHoodieWriteClient(build), "001", str, 100, str2);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }));
        }
        arrayList.forEach(future -> {
            try {
                future.get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType hoodieTableType) throws Exception {
        if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
            setUpMORTestTable();
        }
        Properties properties = new Properties();
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath);
        properties.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        HoodieWriteConfig.Builder withProperties = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().withAutoClean(false).withInlineCompaction(false).withAsyncClean(true).withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withMaxNumDeltaCommitsBeforeCompaction(2).build()).withEmbeddedTimelineServerEnabled(false).withMarkersType(MarkerType.DIRECT.name()).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.MEMORY).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).withAutoCommit(false).withProperties(properties);
        HashSet hashSet = new HashSet();
        HoodieWriteConfig build = withProperties.build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        createCommitWithInserts(build, hoodieWriteClient, "000", "001", 200);
        hashSet.add("001");
        createCommitWithUpserts(build, hoodieWriteClient, "001", "000", "002", 100);
        createCommitWithUpserts(build, hoodieWriteClient, "002", "000", "003", 100);
        hashSet.add("002");
        hashSet.add("003");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        HoodieWriteConfig build2 = withProperties.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build()).build();
        SparkRDDWriteClient hoodieWriteClient2 = getHoodieWriteClient(build2);
        SparkRDDWriteClient hoodieWriteClient3 = getHoodieWriteClient(build);
        Future<?> submit = newFixedThreadPool.submit(() -> {
            try {
                createCommitWithUpserts(build2, hoodieWriteClient2, "003", "002", "004", 100);
                if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
                    Assertions.fail("Conflicts not handled correctly");
                }
                hashSet.add("004");
            } catch (Exception e) {
                if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
                    Assertions.assertTrue(e instanceof HoodieWriteConflictException);
                }
            }
        });
        Future<?> submit2 = newFixedThreadPool.submit(() -> {
            try {
                hoodieWriteClient3.scheduleTableService("005", Option.empty(), TableServiceType.COMPACT);
            } catch (Exception e) {
                if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
                    throw new RuntimeException(e);
                }
            }
        });
        Future<?> submit3 = newFixedThreadPool.submit(() -> {
            try {
                hoodieWriteClient3.scheduleTableService("006", Option.empty(), TableServiceType.CLEAN);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        submit.get();
        submit2.get();
        submit3.get();
        Future<?> submit4 = newFixedThreadPool.submit(() -> {
            try {
                createCommitWithInserts(build2, hoodieWriteClient2, "003", "007", 100);
                hashSet.add("007");
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        Future<?> submit5 = newFixedThreadPool.submit(() -> {
            try {
                hoodieWriteClient3.commitCompaction("005", (JavaRDD) hoodieWriteClient3.compact("005"), Option.empty());
                hashSet.add("005");
            } catch (Exception e) {
                if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
                    throw new RuntimeException(e);
                }
            }
        });
        Future<?> submit6 = newFixedThreadPool.submit(() -> {
            try {
                hoodieWriteClient3.clean("006", false);
                hashSet.add("006");
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        submit4.get();
        submit5.get();
        submit6.get();
        hashSet.addAll((Collection) this.metaClient.reloadActiveTimeline().getCompletedReplaceTimeline().filterCompletedInstants().getInstants().map((v0) -> {
            return v0.getTimestamp();
        }).collect(Collectors.toSet()));
        Assertions.assertTrue(hashSet.containsAll((Set) this.metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().map((v0) -> {
            return v0.getTimestamp();
        }).collect(Collectors.toSet())));
    }

    @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
    @ParameterizedTest
    public void testHoodieClientMultiWriterWithClustering(HoodieTableType hoodieTableType) throws Exception {
        if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
            setUpMORTestTable();
        }
        Properties properties = new Properties();
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        properties.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        HoodieWriteConfig.Builder withProperties = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withMarkersType(MarkerType.DIRECT.name()).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).withAutoCommit(false).withProperties(properties);
        HoodieWriteConfig build = withProperties.build();
        HoodieWriteConfig build2 = withProperties.build();
        HoodieWriteConfig build3 = withProperties.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build()).build();
        createCommitWithInserts(build, getHoodieWriteClient(build), "000", "001", 200);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        JavaRDD<WriteStatus> updateBatch = updateBatch(build, hoodieWriteClient, "003", "001", Option.of(Arrays.asList("002")), "000", 100, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, false, 100, 200, 2);
        SparkRDDWriteClient hoodieWriteClient2 = getHoodieWriteClient(build);
        hoodieWriteClient2.commit("004", updateBatch(build2, hoodieWriteClient2, "004", "001", Option.of(Arrays.asList("002")), "000", 100, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, false, 100, 200, 2));
        Assertions.assertTrue(getHoodieWriteClient(build3).scheduleTableService(Option.empty(), TableServiceType.CLUSTER).isPresent());
        try {
            hoodieWriteClient.commit("003", updateBatch);
            Assertions.fail("Should have thrown a concurrent conflict exception");
        } catch (Exception e) {
        }
    }

    private void createCommitWithInsertsForPartition(HoodieWriteConfig hoodieWriteConfig, SparkRDDWriteClient sparkRDDWriteClient, String str, String str2, int i, String str3) throws Exception {
        Assertions.assertTrue(sparkRDDWriteClient.commit(str2, insertBatch(hoodieWriteConfig, sparkRDDWriteClient, str2, str, i, (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, false, false, i, i, 1, Option.of(str3))), "Commit should succeed");
    }

    private void createCommitWithInserts(HoodieWriteConfig hoodieWriteConfig, SparkRDDWriteClient sparkRDDWriteClient, String str, String str2, int i) throws Exception {
        Assertions.assertTrue(sparkRDDWriteClient.commit(str2, insertFirstBatch(hoodieWriteConfig, sparkRDDWriteClient, str2, str, i, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, false, i)), "Commit should succeed");
    }

    private void createCommitWithUpserts(HoodieWriteConfig hoodieWriteConfig, SparkRDDWriteClient sparkRDDWriteClient, String str, String str2, String str3, int i) throws Exception {
        sparkRDDWriteClient.commit(str3, updateBatch(hoodieWriteConfig, sparkRDDWriteClient, str3, str, Option.of(Arrays.asList(str2)), "000", i, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, false, false, i, 200, 2));
    }

    private JavaRDD<WriteStatus> startCommitForUpdate(HoodieWriteConfig hoodieWriteConfig, SparkRDDWriteClient sparkRDDWriteClient, String str, int i) throws Exception {
        sparkRDDWriteClient.startCommitWithTime(str);
        HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
        hoodieTestDataGenerator.getClass();
        JavaRDD parallelize = this.jsc.parallelize(generateWrapRecordsFn(false, hoodieWriteConfig, hoodieTestDataGenerator::generateUniqueUpdates).apply(str, Integer.valueOf(i)), 1);
        HoodieClientTestBase.Function3 function3 = (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        };
        JavaRDD<WriteStatus> javaRDD = (JavaRDD) function3.apply(sparkRDDWriteClient, parallelize, str);
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(javaRDD.collect());
        return javaRDD;
    }
}
