package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TestDerbyConnector;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.class */
public class TaskLockBoxConcurrencyTest {

    @Rule
    public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule();
    private final ObjectMapper objectMapper = new DefaultObjectMapper();
    private ExecutorService service;
    private TaskStorage taskStorage;
    private TaskLockbox lockbox;

    @Before
    public void setup() {
        TestDerbyConnector connector = this.derby.getConnector();
        connector.createTaskTables();
        this.taskStorage = new MetadataTaskStorage(connector, new TaskStorageConfig((Period) null), new DerbyMetadataStorageActionHandlerFactory(connector, (MetadataStorageTablesConfig) this.derby.metadataTablesConfigSupplier().get(), this.objectMapper));
        this.lockbox = new TaskLockbox(this.taskStorage);
        this.service = Executors.newFixedThreadPool(2);
    }

    @After
    public void teardown() {
        this.service.shutdownNow();
    }

    @Test(timeout = 60000)
    public void testDoInCriticalSectionWithDifferentTasks() throws ExecutionException, InterruptedException, EntryExistsException {
        Interval of = Intervals.of("2017-01-01/2017-01-02");
        NoopTask create = NoopTask.create(10);
        NoopTask create2 = NoopTask.create(100);
        this.lockbox.add(create);
        this.lockbox.add(create2);
        this.taskStorage.insert(create, TaskStatus.running(create.getId()));
        this.taskStorage.insert(create2, TaskStatus.running(create2.getId()));
        SettableSupplier settableSupplier = new SettableSupplier(0);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Future submit = this.service.submit(() -> {
            LockResult tryLock = this.lockbox.tryLock(TaskLockType.EXCLUSIVE, create, of);
            Assert.assertTrue(tryLock.isOk());
            Assert.assertFalse(tryLock.isRevoked());
            return (Integer) this.lockbox.doInCriticalSection(create, Collections.singletonList(of), CriticalAction.builder().onValidLocks(() -> {
                countDownLatch.countDown();
                Thread.sleep(100L);
                settableSupplier.set(Integer.valueOf(((Integer) settableSupplier.get()).intValue() + 1));
                return (Integer) settableSupplier.get();
            }).onInvalidLocks(() -> {
                Assert.fail();
                return null;
            }).build());
        });
        Future submit2 = this.service.submit(() -> {
            countDownLatch.await();
            LockResult lock = this.lockbox.lock(TaskLockType.EXCLUSIVE, create2, of);
            Assert.assertTrue(lock.isOk());
            Assert.assertFalse(lock.isRevoked());
            return (Integer) this.lockbox.doInCriticalSection(create2, Collections.singletonList(of), CriticalAction.builder().onValidLocks(() -> {
                Thread.sleep(100L);
                settableSupplier.set(Integer.valueOf(((Integer) settableSupplier.get()).intValue() + 1));
                return (Integer) settableSupplier.get();
            }).onInvalidLocks(() -> {
                Assert.fail();
                return null;
            }).build());
        });
        Assert.assertEquals(1L, ((Integer) submit.get()).intValue());
        Assert.assertEquals(2L, ((Integer) submit2.get()).intValue());
        LockResult tryLock = this.lockbox.tryLock(TaskLockType.EXCLUSIVE, create, of);
        Assert.assertFalse(tryLock.isOk());
        Assert.assertTrue(tryLock.isRevoked());
    }

    @Test(timeout = 60000)
    public void testDoInCriticalSectionWithOverlappedIntervals() throws Exception {
        ImmutableList of = ImmutableList.of(Intervals.of("2017-01-01/2017-01-02"), Intervals.of("2017-01-02/2017-01-03"), Intervals.of("2017-01-03/2017-01-04"));
        NoopTask create = NoopTask.create();
        this.lockbox.add(create);
        this.taskStorage.insert(create, TaskStatus.running(create.getId()));
        Iterator it = of.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(this.lockbox.tryLock(TaskLockType.EXCLUSIVE, create, (Interval) it.next()).isOk());
        }
        SettableSupplier settableSupplier = new SettableSupplier(0);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Future submit = this.service.submit(() -> {
            return (Integer) this.lockbox.doInCriticalSection(create, ImmutableList.of(of.get(0), of.get(1)), CriticalAction.builder().onValidLocks(() -> {
                countDownLatch.countDown();
                Thread.sleep(100L);
                settableSupplier.set(Integer.valueOf(((Integer) settableSupplier.get()).intValue() + 1));
                return (Integer) settableSupplier.get();
            }).onInvalidLocks(() -> {
                Assert.fail();
                return null;
            }).build());
        });
        Future submit2 = this.service.submit(() -> {
            countDownLatch.await();
            return (Integer) this.lockbox.doInCriticalSection(create, ImmutableList.of(of.get(1), of.get(2)), CriticalAction.builder().onValidLocks(() -> {
                Thread.sleep(100L);
                settableSupplier.set(Integer.valueOf(((Integer) settableSupplier.get()).intValue() + 1));
                return (Integer) settableSupplier.get();
            }).onInvalidLocks(() -> {
                Assert.fail();
                return null;
            }).build());
        });
        Assert.assertEquals(1L, ((Integer) submit.get()).intValue());
        Assert.assertEquals(2L, ((Integer) submit2.get()).intValue());
    }
}
