package org.apache.hudi.utilities.deltastreamer;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.utilities.config.SourceTestConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
import org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.class */
public class TestHoodieDeltaStreamerWithMultiWriter extends HoodieDeltaStreamerTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class);
    String basePath;
    String propsFilePath;
    String tableBasePath;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter$GetCommitsAfterInstant.class */
    public class GetCommitsAfterInstant {
        String basePath;
        String lastSuccessfulCommit;
        HoodieTableMetaClient meta;

        GetCommitsAfterInstant(String str, String str2) {
            this.basePath = str;
            this.lastSuccessfulCommit = str2;
            this.meta = HoodieTableMetaClient.builder().setConf(TestHoodieDeltaStreamerWithMultiWriter.fs.getConf()).setBasePath(str).build();
        }

        long getCommitsAfterInstant() {
            return this.meta.reloadActiveTimeline().getAllCommitsTimeline().findInstantsAfter(this.lastSuccessfulCommit).countInstants();
        }
    }

    @Override // org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @BeforeEach
    public void setup() throws Exception {
        this.basePath = UtilitiesTestBase.basePath;
        super.setupTest();
    }

    @Override // org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @AfterEach
    public void teardown() throws Exception {
        TestDataSource.resetDataGen();
        FileIOUtils.deleteDirectory(new File(this.basePath));
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType hoodieTableType) throws Exception {
        this.basePath = Paths.get(URI.create(this.basePath.replaceAll("/$", ""))).toString();
        this.propsFilePath = this.basePath + "/test-multi-writer.properties";
        this.tableBasePath = this.basePath + "/testUpsertsContinuousModeWithMultipleWritersForConflicts_" + hoodieTableType;
        prepareInitialConfigs(fs, this.basePath, "foo");
        TypedProperties prepareMultiWriterProps = prepareMultiWriterProps(fs, this.basePath, this.propsFilePath);
        prepareMultiWriterProps.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
        prepareMultiWriterProps.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        UtilitiesTestBase.Helpers.savePropsToDFS(prepareMultiWriterProps, fs, this.propsFilePath);
        int i = 3000;
        HoodieDeltaStreamer.Config deltaStreamerConfig = getDeltaStreamerConfig(this.tableBasePath, hoodieTableType.name(), WriteOperationType.UPSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
        deltaStreamerConfig.continuousMode = true;
        deltaStreamerConfig.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), 3000));
        deltaStreamerConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
            deltaStreamerConfig.configs.add(String.format("%s=3", HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key()));
            deltaStreamerConfig.configs.add(String.format("%s=0", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key()));
        }
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(new HoodieDeltaStreamer(deltaStreamerConfig, jsc), deltaStreamerConfig, bool -> {
            if (hoodieTableType.equals(HoodieTableType.MERGE_ON_READ)) {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(3, this.tableBasePath, fs);
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(1, this.tableBasePath, fs);
            } else {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(3, this.tableBasePath, fs);
            }
            assertRecordCount(i, this.tableBasePath, sqlContext);
            assertDistanceCount(i, this.tableBasePath, sqlContext);
            return true;
        });
        HoodieDeltaStreamer.Config deltaStreamerConfig2 = getDeltaStreamerConfig(this.tableBasePath, hoodieTableType.name(), WriteOperationType.UPSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
        deltaStreamerConfig2.continuousMode = true;
        deltaStreamerConfig2.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), 3000));
        deltaStreamerConfig2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        HoodieDeltaStreamer.Config deltaStreamerConfig3 = getDeltaStreamerConfig(this.tableBasePath, hoodieTableType.name(), WriteOperationType.UPSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
        deltaStreamerConfig3.continuousMode = false;
        HoodieTimeline filterCompletedInstants = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(this.tableBasePath).build().reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        deltaStreamerConfig3.checkpoint = ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) filterCompletedInstants.getInstantDetails((HoodieInstant) filterCompletedInstants.firstInstant().get()).get(), HoodieCommitMetadata.class)).getMetadata("deltastreamer.checkpoint.key");
        deltaStreamerConfig3.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), 3000));
        deltaStreamerConfig3.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        runJobsInParallel(this.tableBasePath, hoodieTableType, 3000, new HoodieDeltaStreamer(deltaStreamerConfig2, jsc), deltaStreamerConfig2, new HoodieDeltaStreamer(deltaStreamerConfig3, jsc), deltaStreamerConfig3, true, "batch1");
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableType hoodieTableType) throws Exception {
        this.basePath = Paths.get(URI.create(this.basePath.replaceAll("/$", ""))).toString();
        this.propsFilePath = this.basePath + "/test-multi-writer.properties";
        this.tableBasePath = this.basePath + "/testUpsertsContinuousModeWithMultipleWritersWithoutConflicts_" + hoodieTableType;
        prepareInitialConfigs(fs, this.basePath, "foo");
        TypedProperties prepareMultiWriterProps = prepareMultiWriterProps(fs, this.basePath, this.propsFilePath);
        prepareMultiWriterProps.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
        prepareMultiWriterProps.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        UtilitiesTestBase.Helpers.savePropsToDFS(prepareMultiWriterProps, fs, this.propsFilePath);
        int i = 3000;
        HoodieDeltaStreamer.Config deltaStreamerConfig = getDeltaStreamerConfig(this.tableBasePath, hoodieTableType.name(), WriteOperationType.UPSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
        deltaStreamerConfig.continuousMode = true;
        deltaStreamerConfig.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), 3000));
        deltaStreamerConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(new HoodieDeltaStreamer(deltaStreamerConfig, jsc), deltaStreamerConfig, bool -> {
            if (hoodieTableType.equals(HoodieTableType.MERGE_ON_READ)) {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(3, this.tableBasePath, fs);
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(1, this.tableBasePath, fs);
            } else {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(3, this.tableBasePath, fs);
            }
            assertRecordCount(i, this.tableBasePath, sqlContext);
            assertDistanceCount(i, this.tableBasePath, sqlContext);
            return true;
        });
        TypedProperties prepareMultiWriterProps2 = prepareMultiWriterProps(fs, this.basePath, this.propsFilePath);
        prepareMultiWriterProps2.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
        prepareMultiWriterProps2.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        prepareMultiWriterProps2.setProperty("hoodie.test.source.generate.inserts", "true");
        UtilitiesTestBase.Helpers.savePropsToDFS(prepareMultiWriterProps2, fs, this.basePath + "/test-multi-writer.properties");
        HoodieDeltaStreamer.Config deltaStreamerConfig2 = getDeltaStreamerConfig(this.tableBasePath, hoodieTableType.name(), WriteOperationType.INSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
        deltaStreamerConfig2.continuousMode = false;
        HoodieTimeline filterCompletedInstants = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(this.tableBasePath).build().getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        deltaStreamerConfig2.checkpoint = ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) filterCompletedInstants.getInstantDetails((HoodieInstant) filterCompletedInstants.firstInstant().get()).get(), HoodieCommitMetadata.class)).getMetadata("deltastreamer.checkpoint.key");
        deltaStreamerConfig2.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), 3000));
        deltaStreamerConfig2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        HoodieDeltaStreamer.Config deltaStreamerConfig3 = getDeltaStreamerConfig(this.tableBasePath, hoodieTableType.name(), WriteOperationType.UPSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
        deltaStreamerConfig3.continuousMode = true;
        deltaStreamerConfig3.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), 3000));
        deltaStreamerConfig3.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        runJobsInParallel(this.tableBasePath, hoodieTableType, 3000, new HoodieDeltaStreamer(deltaStreamerConfig3, jsc), deltaStreamerConfig3, new HoodieDeltaStreamer(deltaStreamerConfig2, jsc), deltaStreamerConfig2, false, "batch2");
    }

    @Disabled
    @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"})
    @ParameterizedTest
    void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType hoodieTableType) throws Exception {
        this.basePath = Paths.get(URI.create(this.basePath.replaceAll("/$", ""))).toString();
        this.propsFilePath = this.basePath + "/test-multi-writer.properties";
        this.tableBasePath = this.basePath + "/testLatestCheckpointCarryOverWithMultipleWriters_" + hoodieTableType;
        prepareInitialConfigs(fs, this.basePath, "foo");
        TypedProperties prepareMultiWriterProps = prepareMultiWriterProps(fs, this.basePath, this.propsFilePath);
        prepareMultiWriterProps.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
        prepareMultiWriterProps.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        UtilitiesTestBase.Helpers.savePropsToDFS(prepareMultiWriterProps, fs, this.propsFilePath);
        int i = 3000;
        HoodieDeltaStreamer.Config deltaStreamerConfig = getDeltaStreamerConfig(this.tableBasePath, hoodieTableType.name(), WriteOperationType.UPSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
        deltaStreamerConfig.continuousMode = true;
        deltaStreamerConfig.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), 3000));
        deltaStreamerConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(new HoodieDeltaStreamer(deltaStreamerConfig, jsc), deltaStreamerConfig, bool -> {
            if (hoodieTableType.equals(HoodieTableType.MERGE_ON_READ)) {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(3, this.tableBasePath, fs);
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(1, this.tableBasePath, fs);
            } else {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(3, this.tableBasePath, fs);
            }
            assertRecordCount(i, this.tableBasePath, sqlContext);
            assertDistanceCount(i, this.tableBasePath, sqlContext);
            return true;
        });
        HoodieDeltaStreamer.Config deltaStreamerConfig2 = getDeltaStreamerConfig(this.tableBasePath, hoodieTableType.name(), WriteOperationType.UPSERT, this.propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
        deltaStreamerConfig2.continuousMode = false;
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(this.tableBasePath).build();
        HoodieTimeline filterCompletedInstants = build.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        TypedProperties prepareMultiWriterProps2 = prepareMultiWriterProps(fs, this.basePath, this.propsFilePath);
        prepareMultiWriterProps2.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
        prepareMultiWriterProps2.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        UtilitiesTestBase.Helpers.savePropsToDFS(prepareMultiWriterProps2, fs, this.propsFilePath);
        deltaStreamerConfig2.checkpoint = getLatestMetadata(build).getMetadata("deltastreamer.checkpoint.key");
        deltaStreamerConfig2.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), 3000));
        deltaStreamerConfig2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        new HoodieDeltaStreamer(deltaStreamerConfig2, jsc).sync();
        build.reloadActiveTimeline();
        int countInstants = build.getCommitsTimeline().filterCompletedInstants().countInstants();
        addCommitToTimeline(build);
        build.reloadActiveTimeline();
        verifyCommitMetadataCheckpoint(build, null);
        deltaStreamerConfig2.checkpoint = null;
        new HoodieDeltaStreamer(deltaStreamerConfig2, jsc).sync();
        build.reloadActiveTimeline();
        Assertions.assertEquals(countInstants + 2, build.getCommitsTimeline().filterCompletedInstants().countInstants());
        verifyCommitMetadataCheckpoint(build, "00008");
    }

    private void verifyCommitMetadataCheckpoint(HoodieTableMetaClient hoodieTableMetaClient, String str) throws IOException {
        HoodieCommitMetadata latestMetadata = getLatestMetadata(hoodieTableMetaClient);
        if (str == null) {
            Assertions.assertNull(latestMetadata.getMetadata("deltastreamer.checkpoint.key"));
        } else {
            Assertions.assertEquals(str, latestMetadata.getMetadata("deltastreamer.checkpoint.key"));
        }
    }

    private static HoodieCommitMetadata getLatestMetadata(HoodieTableMetaClient hoodieTableMetaClient) throws IOException {
        HoodieTimeline filterCompletedInstants = hoodieTableMetaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
        return (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) filterCompletedInstants.getInstantDetails((HoodieInstant) filterCompletedInstants.lastInstant().get()).get(), HoodieCommitMetadata.class);
    }

    private static TypedProperties prepareMultiWriterProps(FileSystem fileSystem, String str, String str2) throws IOException {
        TypedProperties typedProperties = new TypedProperties();
        populateCommonProps(typedProperties, str);
        populateCommonHiveProps(typedProperties);
        typedProperties.setProperty("include", "sql-transformer.properties");
        typedProperties.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", str + "/source.avsc");
        typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", str + "/target.avsc");
        typedProperties.setProperty("include", "base.properties");
        typedProperties.setProperty("hoodie.write.concurrency.mode", "optimistic_concurrency_control");
        typedProperties.setProperty("hoodie.cleaner.policy.failed.writes", "LAZY");
        typedProperties.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider");
        typedProperties.setProperty("hoodie.write.lock.hivemetastore.database", "testdb1");
        typedProperties.setProperty("hoodie.write.lock.hivemetastore.table", "table1");
        typedProperties.setProperty("hoodie.write.lock.zookeeper.url", "127.0.0.1");
        typedProperties.setProperty("hoodie.write.lock.zookeeper.port", "2828");
        typedProperties.setProperty("hoodie.write.lock.wait_time_ms", "1200000");
        typedProperties.setProperty("hoodie.write.lock.num_retries", "10");
        typedProperties.setProperty("hoodie.write.lock.zookeeper.lock_key", "test_table");
        typedProperties.setProperty("hoodie.write.lock.zookeeper.base_path", "/test");
        typedProperties.setProperty(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key(), "4");
        typedProperties.setProperty(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), "4");
        typedProperties.setProperty(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), "4");
        typedProperties.setProperty(HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE.key(), "4");
        typedProperties.setProperty(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name());
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, fileSystem, str2);
        return typedProperties;
    }

    private static HoodieDeltaStreamer.Config getDeltaStreamerConfig(String str, String str2, WriteOperationType writeOperationType, String str3, List<String> list) {
        HoodieDeltaStreamer.Config config = new HoodieDeltaStreamer.Config();
        config.targetBasePath = str;
        config.targetTableName = "hoodie_trips";
        config.tableType = str2;
        config.sourceClassName = TestDataSource.class.getName();
        config.transformerClassNames = list;
        config.operation = writeOperationType;
        config.enableHiveSync = false;
        config.sourceOrderingField = "timestamp";
        config.propsFilePath = str3;
        config.sourceLimit = 1000L;
        config.schemaProviderClassName = defaultSchemaProviderClassName;
        return config;
    }

    private void runJobsInParallel(String str, HoodieTableType hoodieTableType, int i, HoodieDeltaStreamer hoodieDeltaStreamer, HoodieDeltaStreamer.Config config, HoodieDeltaStreamer hoodieDeltaStreamer2, HoodieDeltaStreamer.Config config2, boolean z, String str2) throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        String timestamp = ((HoodieInstant) HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(str).build().getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get()).getTimestamp();
        Function function = bool -> {
            if (hoodieTableType.equals(HoodieTableType.MERGE_ON_READ)) {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, timestamp, str, fs);
            } else {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, timestamp, str, fs);
            }
            assertRecordCount(i, str, sqlContext);
            assertDistanceCount(i, str, sqlContext);
            return true;
        };
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        try {
            try {
                Future<?> submit = newFixedThreadPool.submit(() -> {
                    try {
                        TestHoodieDeltaStreamer.deltaStreamerTestRunner(hoodieDeltaStreamer, config, function, str2);
                    } catch (Throwable th) {
                        atomicBoolean.set(true);
                        LOG.error("Continuous job failed " + th.getMessage());
                        throw new RuntimeException(th);
                    }
                });
                newFixedThreadPool.submit(() -> {
                    try {
                        awaitCondition(new GetCommitsAfterInstant(str, timestamp));
                        hoodieDeltaStreamer2.sync();
                    } catch (Throwable th) {
                        LOG.error("Backfilling job failed " + th.getMessage());
                        atomicBoolean2.set(true);
                        throw new RuntimeException(th);
                    }
                }).get();
                submit.get();
                if (z) {
                    Assertions.fail("Failed to handle concurrent writes");
                }
                newFixedThreadPool.shutdown();
            } catch (Exception e) {
                if (z && atomicBoolean2.get() && e.getCause().getMessage().contains(ConcurrentModificationException.class.getName())) {
                    if (atomicBoolean.get()) {
                        throw new HoodieException("Both backfilling and ingestion job failed ", e);
                    }
                    LOG.warn("Calling shutdown on ingestion job since the backfill job has failed for " + str2);
                    hoodieDeltaStreamer.shutdownGracefully();
                } else {
                    if (!z || !atomicBoolean.get() || !e.getCause().getMessage().contains("Ingestion service was shut down with exception")) {
                        LOG.error("Conflict happened, but not expected " + e.getCause().getMessage());
                        throw e;
                    }
                    if (atomicBoolean2.get()) {
                        throw new HoodieException("Both backfilling and ingestion job failed ", e);
                    }
                    LOG.warn("Calling shutdown on backfill job since the ingstion/continuous job has failed for " + str2);
                    hoodieDeltaStreamer2.shutdownGracefully();
                }
                newFixedThreadPool.shutdown();
            }
        } catch (Throwable th) {
            newFixedThreadPool.shutdown();
            throw th;
        }
    }

    private static void awaitCondition(GetCommitsAfterInstant getCommitsAfterInstant) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 > 5000 || getCommitsAfterInstant.getCommitsAfterInstant() > 0) {
                break;
            }
            Thread.sleep(500L);
            j = j2 + 500;
        }
        LOG.warn("Awaiting completed in " + (System.currentTimeMillis() - currentTimeMillis));
    }
}
