package org.apache.hudi.client;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import org.apache.curator.test.TestingServer;
import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.marker.SimpleTransactionDirectMarkerBasedDetectionStrategy;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.utils.HoodieWriterClientTestHarness;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/hudi/client/TestSimpleTransactionDirectMarkerBasedDetectionStrategyWithZKLockProvider.class */
public class TestSimpleTransactionDirectMarkerBasedDetectionStrategyWithZKLockProvider extends HoodieClientTestBase {
    private HoodieWriteConfig config;
    private TestingServer server;

    private void setUp(boolean z) throws Exception {
        initPath();
        if (z) {
            initTestDataGenerator();
        } else {
            initTestDataGenerator(new String[]{""});
        }
        initFileSystem();
        this.metaClient = HoodieTestUtils.init(this.hadoopConf, this.basePath, HoodieTableType.MERGE_ON_READ);
        Properties propertiesForKeyGen = getPropertiesForKeyGen();
        propertiesForKeyGen.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
        this.server = new TestingServer();
        propertiesForKeyGen.setProperty("hoodie.write.lock.zookeeper.base_path", this.basePath);
        propertiesForKeyGen.setProperty("hoodie.write.lock.zookeeper.url", this.server.getConnectString());
        propertiesForKeyGen.setProperty("hoodie.write.lock.zookeeper.base_path", this.server.getTempDirectory().getAbsolutePath());
        propertiesForKeyGen.setProperty("hoodie.write.lock.zookeeper.lock_key", "key");
        this.config = getConfigBuilder().withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.MEMORY).withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build()).withArchivalConfig(HoodieArchivalConfig.newBuilder().withAutoArchive(false).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withMarkersType(MarkerType.DIRECT.name()).withEarlyConflictDetectionEnable(true).withEarlyConflictDetectionStrategy(SimpleTransactionDirectMarkerBasedDetectionStrategy.class.getName()).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(ZookeeperBasedLockProvider.class).build()).withAutoCommit(false).withProperties(propertiesForKeyGen).build();
    }

    @AfterEach
    public void clean() throws IOException {
        cleanupResources();
        FileIOUtils.deleteDirectory(new File(this.basePath));
        if (this.server != null) {
            this.server.close();
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testSimpleTransactionDirectMarkerBasedDetectionStrategy(boolean z) throws Exception {
        setUp(z);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(this.config);
        HoodieWriteConfig hoodieWriteConfig = this.config;
        HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
        hoodieTestDataGenerator.getClass();
        JavaRDD parallelize = this.jsc.parallelize((List) generateWrapRecordsFn(false, hoodieWriteConfig, hoodieTestDataGenerator::generateInserts).apply("00000000000001", 200), 1);
        hoodieWriteClient.startCommitWithTime("00000000000001");
        Assertions.assertTrue(hoodieWriteClient.commit("00000000000001", hoodieWriteClient.insert(parallelize, "00000000000001")), "Commit should succeed");
        SparkRDDWriteClient hoodieWriteClient2 = getHoodieWriteClient(this.config);
        SparkRDDWriteClient hoodieWriteClient3 = getHoodieWriteClient(this.config);
        HoodieWriteConfig hoodieWriteConfig2 = this.config;
        HoodieTestDataGenerator hoodieTestDataGenerator2 = this.dataGen;
        hoodieTestDataGenerator2.getClass();
        HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn = generateWrapRecordsFn(false, hoodieWriteConfig2, hoodieTestDataGenerator2::generateUniqueUpdates);
        JavaRDD parallelize2 = this.jsc.parallelize((List) generateWrapRecordsFn.apply("00000000000002", 200), 1);
        hoodieWriteClient2.startCommitWithTime("00000000000002");
        JavaRDD upsert = hoodieWriteClient2.upsert(parallelize2, "00000000000002");
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(upsert.collect());
        Assertions.assertThrows(SparkException.class, () -> {
            JavaRDD parallelize3 = this.jsc.parallelize((List) generateWrapRecordsFn.apply("00000000000003", 200), 1);
            hoodieWriteClient3.startCommitWithTime("00000000000003");
            hoodieWriteClient3.commit("00000000000003", hoodieWriteClient3.upsert(parallelize3, "00000000000003"));
        }, "Early conflict detected but cannot resolve conflicts for overlapping writes");
        Assertions.assertDoesNotThrow(() -> {
            hoodieWriteClient2.commit("00000000000002", upsert);
        });
    }
}
