package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.class */
public class TaskLockBoxConcurrencyTest {

    @Rule
    public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule();
    private final ObjectMapper objectMapper = new DefaultObjectMapper();
    private ExecutorService service;
    private TaskStorage taskStorage;
    private TaskLockbox lockbox;
    private SegmentSchemaManager segmentSchemaManager;

    @Before
    public void setup() {
        TestDerbyConnector connector = this.derby.getConnector();
        connector.createTaskTables();
        this.taskStorage = new MetadataTaskStorage(connector, new TaskStorageConfig((Period) null), new DerbyMetadataStorageActionHandlerFactory(connector, (MetadataStorageTablesConfig) this.derby.metadataTablesConfigSupplier().get(), this.objectMapper));
        this.segmentSchemaManager = new SegmentSchemaManager((MetadataStorageTablesConfig) this.derby.metadataTablesConfigSupplier().get(), this.objectMapper, connector);
        this.lockbox = new TaskLockbox(this.taskStorage, new IndexerSQLMetadataStorageCoordinator(this.objectMapper, (MetadataStorageTablesConfig) this.derby.metadataTablesConfigSupplier().get(), connector, this.segmentSchemaManager, CentralizedDatasourceSchemaConfig.create()));
        this.service = Execs.multiThreaded(2, "TaskLockBoxConcurrencyTest-%d");
    }

    @After
    public void teardown() {
        this.service.shutdownNow();
    }

    private LockResult tryTimeChunkLock(TaskLockType taskLockType, Task task, Interval interval) {
        return this.lockbox.tryLock(task, new TimeChunkLockRequest(taskLockType, task, interval, (String) null));
    }

    private LockResult acquireTimeChunkLock(TaskLockType taskLockType, Task task, Interval interval) throws InterruptedException {
        return this.lockbox.lock(task, new TimeChunkLockRequest(taskLockType, task, interval, (String) null));
    }

    @Test(timeout = 60000)
    public void testDoInCriticalSectionWithDifferentTasks() throws ExecutionException, InterruptedException {
        Interval of = Intervals.of("2017-01-01/2017-01-02");
        NoopTask ofPriority = NoopTask.ofPriority(10);
        NoopTask ofPriority2 = NoopTask.ofPriority(100);
        this.lockbox.add(ofPriority);
        this.lockbox.add(ofPriority2);
        this.taskStorage.insert(ofPriority, TaskStatus.running(ofPriority.getId()));
        this.taskStorage.insert(ofPriority2, TaskStatus.running(ofPriority2.getId()));
        SettableSupplier settableSupplier = new SettableSupplier(0);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Future submit = this.service.submit(() -> {
            LockResult tryTimeChunkLock = tryTimeChunkLock(TaskLockType.EXCLUSIVE, ofPriority, of);
            Assert.assertTrue(tryTimeChunkLock.isOk());
            Assert.assertFalse(tryTimeChunkLock.isRevoked());
            return (Integer) this.lockbox.doInCriticalSection(ofPriority, Collections.singleton(of), CriticalAction.builder().onValidLocks(() -> {
                countDownLatch.countDown();
                Thread.sleep(100L);
                settableSupplier.set(Integer.valueOf(((Integer) settableSupplier.get()).intValue() + 1));
                return (Integer) settableSupplier.get();
            }).onInvalidLocks(() -> {
                Assert.fail();
                return null;
            }).build());
        });
        Future submit2 = this.service.submit(() -> {
            countDownLatch.await();
            LockResult acquireTimeChunkLock = acquireTimeChunkLock(TaskLockType.EXCLUSIVE, ofPriority2, of);
            Assert.assertTrue(acquireTimeChunkLock.isOk());
            Assert.assertFalse(acquireTimeChunkLock.isRevoked());
            return (Integer) this.lockbox.doInCriticalSection(ofPriority2, Collections.singleton(of), CriticalAction.builder().onValidLocks(() -> {
                Thread.sleep(100L);
                settableSupplier.set(Integer.valueOf(((Integer) settableSupplier.get()).intValue() + 1));
                return (Integer) settableSupplier.get();
            }).onInvalidLocks(() -> {
                Assert.fail();
                return null;
            }).build());
        });
        Assert.assertEquals(1L, ((Integer) submit.get()).intValue());
        Assert.assertEquals(2L, ((Integer) submit2.get()).intValue());
        LockResult tryTimeChunkLock = tryTimeChunkLock(TaskLockType.EXCLUSIVE, ofPriority, of);
        Assert.assertFalse(tryTimeChunkLock.isOk());
        Assert.assertTrue(tryTimeChunkLock.isRevoked());
    }

    @Test(timeout = 60000)
    public void testDoInCriticalSectionWithOverlappedIntervals() throws Exception {
        ImmutableList of = ImmutableList.of(Intervals.of("2017-01-01/2017-01-02"), Intervals.of("2017-01-02/2017-01-03"), Intervals.of("2017-01-03/2017-01-04"));
        NoopTask create = NoopTask.create();
        this.lockbox.add(create);
        this.taskStorage.insert(create, TaskStatus.running(create.getId()));
        Iterator it = of.iterator();
        while (it.hasNext()) {
            Assert.assertTrue(tryTimeChunkLock(TaskLockType.EXCLUSIVE, create, (Interval) it.next()).isOk());
        }
        SettableSupplier settableSupplier = new SettableSupplier(0);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Future submit = this.service.submit(() -> {
            return (Integer) this.lockbox.doInCriticalSection(create, new HashSet(of.subList(0, 2)), CriticalAction.builder().onValidLocks(() -> {
                countDownLatch.countDown();
                Thread.sleep(100L);
                settableSupplier.set(Integer.valueOf(((Integer) settableSupplier.get()).intValue() + 1));
                return (Integer) settableSupplier.get();
            }).onInvalidLocks(() -> {
                Assert.fail();
                return null;
            }).build());
        });
        Future submit2 = this.service.submit(() -> {
            countDownLatch.await();
            return (Integer) this.lockbox.doInCriticalSection(create, new HashSet(of.subList(1, 3)), CriticalAction.builder().onValidLocks(() -> {
                Thread.sleep(100L);
                settableSupplier.set(Integer.valueOf(((Integer) settableSupplier.get()).intValue() + 1));
                return (Integer) settableSupplier.get();
            }).onInvalidLocks(() -> {
                Assert.fail();
                return null;
            }).build());
        });
        Assert.assertEquals(1L, ((Integer) submit.get()).intValue());
        Assert.assertEquals(2L, ((Integer) submit2.get()).intValue());
    }
}
