/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table.timeline;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.table.timeline.TimeGenerator;
import org.apache.hudi.common.table.timeline.TimeGeneratorType;
import org.apache.hudi.common.table.timeline.TimeGenerators;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.hudi.storage.StorageConfiguration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class TestWaitBasedTimeGenerator {
    private final long clockSkewTime = 20L;
    private final StorageConfiguration<?> storageConf = HoodieTestUtils.getDefaultStorageConfWithDefaults();
    private HoodieTimeGeneratorConfig timeGeneratorConfig;

    @BeforeEach
    public void initialize() {
        this.timeGeneratorConfig = HoodieTimeGeneratorConfig.newBuilder().withPath("test_wait_based").withMaxExpectedClockSkewMs(25L).withTimeGeneratorType(TimeGeneratorType.WAIT_TO_ADJUST_SKEW).build();
        this.timeGeneratorConfig.setValue("hoodie.write.lock.provider", MockInProcessLockProvider.class.getName());
        MockInProcessLockProvider.initialize();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testSlowerThreadLaterAcquiredLock(boolean slowerThreadAcquiredLockLater) throws InterruptedException {
        AtomicLong t1Timestamp = new AtomicLong(0L);
        Thread t1 = new Thread(() -> {
            try {
                MockInProcessLockProvider.needToLockLater(!slowerThreadAcquiredLockLater);
                TimeGenerator timeGenerator = TimeGenerators.getTimeGenerator((HoodieTimeGeneratorConfig)this.timeGeneratorConfig, this.storageConf);
                t1Timestamp.set(timeGenerator.currentTimeMillis(false));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        AtomicLong t2Timestamp = new AtomicLong(0L);
        Thread t2 = new Thread(() -> {
            try {
                MockInProcessLockProvider.needToLockLater(slowerThreadAcquiredLockLater);
                TimeGenerator timeGenerator = TimeGenerators.getTimeGenerator((HoodieTimeGeneratorConfig)this.timeGeneratorConfig, this.storageConf);
                t2Timestamp.set(timeGenerator.currentTimeMillis(false) - 20L);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        t1.start();
        t2.start();
        t1.join(60000L);
        t2.join(60000L);
        Assertions.assertTrue((t2Timestamp.get() != 0L ? 1 : 0) != 0);
        Assertions.assertTrue((t1Timestamp.get() != 0L ? 1 : 0) != 0);
        if (slowerThreadAcquiredLockLater) {
            Assertions.assertTrue((t2Timestamp.get() > t1Timestamp.get() ? 1 : 0) != 0);
        } else {
            Assertions.assertTrue((t2Timestamp.get() < t1Timestamp.get() ? 1 : 0) != 0);
        }
    }

    public static class MockInProcessLockProvider
    extends InProcessLockProvider {
        private static final ThreadLocal<Boolean> NEED_TO_LOCK_LATER = ThreadLocal.withInitial(() -> false);
        private static CountDownLatch SIGNAL;

        public static void initialize() {
            SIGNAL = new CountDownLatch(1);
        }

        public static void needToLockLater(Boolean lockLater) {
            NEED_TO_LOCK_LATER.set(lockLater);
        }

        public MockInProcessLockProvider(LockConfiguration lockConfiguration, StorageConfiguration<?> conf) {
            super(lockConfiguration, conf);
        }

        public boolean tryLock(long time, TimeUnit unit) {
            boolean isLocked;
            if (NEED_TO_LOCK_LATER.get().booleanValue()) {
                try {
                    SIGNAL.await();
                }
                catch (InterruptedException e) {
                    throw new HoodieLockException((Throwable)e);
                }
            }
            if (isLocked = super.tryLock(time, unit)) {
                SIGNAL.countDown();
            }
            return isLocked;
        }
    }
}

