package org.apache.druid.server.coordinator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ChildrenDeletable;
import org.apache.curator.framework.api.GetDataWatchBackgroundStatable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordination.DataSegmentChangeCallback;
import org.apache.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/server/coordinator/LoadQueuePeonTest.class */
public class LoadQueuePeonTest extends CuratorTestBase {
    private static final String LOAD_QUEUE_PATH = "/druid/loadqueue/localhost:1234";
    private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
    private LoadQueuePeon loadQueuePeon;
    private PathChildrenCache loadQueueCache;

    @Before
    public void setUp() throws Exception {
        setupServerAndCurator();
        this.curator.start();
        this.curator.blockUntilConnected();
        this.curator.create().creatingParentsIfNeeded().forPath(LOAD_QUEUE_PATH);
        this.loadQueueCache = new PathChildrenCache(this.curator, LOAD_QUEUE_PATH, true, true, Execs.singleThreaded("load_queue_cache-%d"));
    }

    @Test
    public void testMultipleLoadDropSegments() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        this.loadQueuePeon = new CuratorLoadQueuePeon(this.curator, LOAD_QUEUE_PATH, this.jsonMapper, Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"), Execs.singleThreaded("test_load_queue_peon-%d"), new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false, Duration.ZERO));
        this.loadQueuePeon.start();
        final CountDownLatch[] countDownLatchArr = new CountDownLatch[5];
        final CountDownLatch[] countDownLatchArr2 = new CountDownLatch[5];
        final CountDownLatch[] countDownLatchArr3 = new CountDownLatch[5];
        final CountDownLatch[] countDownLatchArr4 = new CountDownLatch[5];
        for (int i = 0; i < 5; i++) {
            countDownLatchArr[i] = new CountDownLatch(1);
            countDownLatchArr2[i] = new CountDownLatch(1);
            countDownLatchArr3[i] = new CountDownLatch(1);
            countDownLatchArr4[i] = new CountDownLatch(1);
        }
        final DataSegmentChangeHandler dataSegmentChangeHandler = new DataSegmentChangeHandler() { // from class: org.apache.druid.server.coordinator.LoadQueuePeonTest.1
            public void addSegment(DataSegment dataSegment, DataSegmentChangeCallback dataSegmentChangeCallback) {
                countDownLatchArr[atomicInteger.get()].countDown();
            }

            public void removeSegment(DataSegment dataSegment, DataSegmentChangeCallback dataSegmentChangeCallback) {
                countDownLatchArr2[atomicInteger.get()].countDown();
            }
        };
        List<DataSegment> transform = Lists.transform(ImmutableList.of("2014-10-26T00:00:00Z/P1D", "2014-10-25T00:00:00Z/P1D", "2014-10-24T00:00:00Z/P1D", "2014-10-23T00:00:00Z/P1D", "2014-10-22T00:00:00Z/P1D"), new Function<String, DataSegment>() { // from class: org.apache.druid.server.coordinator.LoadQueuePeonTest.2
            public DataSegment apply(String str) {
                return LoadQueuePeonTest.this.dataSegmentWithInterval(str);
            }
        });
        List transform2 = Lists.transform(ImmutableList.of("2014-10-27T00:00:00Z/P1D", "2014-10-29T00:00:00Z/P1M", "2014-10-31T00:00:00Z/P1D", "2014-10-30T00:00:00Z/P1D", "2014-10-28T00:00:00Z/P1D"), new Function<String, DataSegment>() { // from class: org.apache.druid.server.coordinator.LoadQueuePeonTest.3
            public DataSegment apply(String str) {
                return LoadQueuePeonTest.this.dataSegmentWithInterval(str);
            }
        });
        List<DataSegment> transform3 = Lists.transform(ImmutableList.of("2014-10-29T00:00:00Z/P1M", "2014-10-31T00:00:00Z/P1D", "2014-10-30T00:00:00Z/P1D", "2014-10-28T00:00:00Z/P1D", "2014-10-27T00:00:00Z/P1D"), new Function<String, DataSegment>() { // from class: org.apache.druid.server.coordinator.LoadQueuePeonTest.4
            public DataSegment apply(String str) {
                return LoadQueuePeonTest.this.dataSegmentWithInterval(str);
            }
        });
        this.loadQueueCache.getListenable().addListener(new PathChildrenCacheListener() { // from class: org.apache.druid.server.coordinator.LoadQueuePeonTest.5
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                if (pathChildrenCacheEvent.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                    ((DataSegmentChangeRequest) LoadQueuePeonTest.this.jsonMapper.readValue(pathChildrenCacheEvent.getData().getData(), DataSegmentChangeRequest.class)).go(dataSegmentChangeHandler, (DataSegmentChangeCallback) null);
                }
            }
        });
        this.loadQueueCache.start();
        Iterator it = transform.iterator();
        while (it.hasNext()) {
            this.loadQueuePeon.dropSegment((DataSegment) it.next(), new LoadPeonCallback() { // from class: org.apache.druid.server.coordinator.LoadQueuePeonTest.6
                public void execute() {
                    countDownLatchArr4[atomicInteger2.get()].countDown();
                }
            });
        }
        Iterator it2 = transform2.iterator();
        while (it2.hasNext()) {
            this.loadQueuePeon.loadSegment((DataSegment) it2.next(), new LoadPeonCallback() { // from class: org.apache.druid.server.coordinator.LoadQueuePeonTest.7
                public void execute() {
                    countDownLatchArr3[atomicInteger2.get()].countDown();
                }
            });
        }
        Assert.assertEquals(6000L, this.loadQueuePeon.getLoadQueueSize());
        Assert.assertEquals(5L, this.loadQueuePeon.getSegmentsToLoad().size());
        Assert.assertEquals(5L, this.loadQueuePeon.getSegmentsToDrop().size());
        for (DataSegment dataSegment : transform) {
            String makePath = ZKPaths.makePath(LOAD_QUEUE_PATH, dataSegment.getIdentifier());
            Assert.assertTrue(this.timing.forWaiting().awaitLatch(countDownLatchArr2[atomicInteger.get()]));
            Assert.assertNotNull(this.curator.checkExists().forPath(makePath));
            Assert.assertEquals(dataSegment, ((SegmentChangeRequestDrop) this.jsonMapper.readValue((byte[]) ((GetDataWatchBackgroundStatable) this.curator.getData().decompressed()).forPath(makePath), DataSegmentChangeRequest.class)).getSegment());
            if (atomicInteger.get() == 4) {
                atomicInteger.set(0);
            } else {
                atomicInteger.incrementAndGet();
            }
            ((ChildrenDeletable) this.curator.delete().guaranteed()).forPath(makePath);
            Assert.assertTrue(this.timing.forWaiting().awaitLatch(countDownLatchArr4[atomicInteger2.get()]));
            Assert.assertEquals((5 - atomicInteger2.get()) - 1, this.loadQueuePeon.getSegmentsToDrop().size());
            if (atomicInteger2.get() == 4) {
                atomicInteger2.set(0);
            } else {
                atomicInteger2.incrementAndGet();
            }
        }
        for (DataSegment dataSegment2 : transform3) {
            String makePath2 = ZKPaths.makePath(LOAD_QUEUE_PATH, dataSegment2.getIdentifier());
            Assert.assertTrue(this.timing.forWaiting().awaitLatch(countDownLatchArr[atomicInteger.get()]));
            Assert.assertNotNull(this.curator.checkExists().forPath(makePath2));
            Assert.assertEquals(dataSegment2, ((SegmentChangeRequestLoad) this.jsonMapper.readValue((byte[]) ((GetDataWatchBackgroundStatable) this.curator.getData().decompressed()).forPath(makePath2), DataSegmentChangeRequest.class)).getSegment());
            atomicInteger.incrementAndGet();
            ((ChildrenDeletable) this.curator.delete().guaranteed()).forPath(makePath2);
            Assert.assertTrue(this.timing.forWaiting().awaitLatch(countDownLatchArr3[atomicInteger2.get()]));
            int i2 = (5 - atomicInteger2.get()) - 1;
            Assert.assertEquals(1200 * i2, this.loadQueuePeon.getLoadQueueSize());
            Assert.assertEquals(i2, this.loadQueuePeon.getSegmentsToLoad().size());
            atomicInteger2.incrementAndGet();
        }
    }

    @Test
    public void testFailAssign() throws Exception {
        DataSegment dataSegmentWithInterval = dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D");
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.loadQueuePeon = new CuratorLoadQueuePeon(this.curator, LOAD_QUEUE_PATH, this.jsonMapper, Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"), Execs.singleThreaded("test_load_queue_peon-%d"), new TestDruidCoordinatorConfig(null, null, null, new Duration(1L), null, null, 10, null, false, false, new Duration("PT1s")));
        this.loadQueuePeon.start();
        this.loadQueueCache.getListenable().addListener(new PathChildrenCacheListener() { // from class: org.apache.druid.server.coordinator.LoadQueuePeonTest.8
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) {
                if (pathChildrenCacheEvent.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                    countDownLatch.countDown();
                }
            }
        });
        this.loadQueueCache.start();
        this.loadQueuePeon.loadSegment(dataSegmentWithInterval, new LoadPeonCallback() { // from class: org.apache.druid.server.coordinator.LoadQueuePeonTest.9
            public void execute() {
                countDownLatch2.countDown();
            }
        });
        String makePath = ZKPaths.makePath(LOAD_QUEUE_PATH, dataSegmentWithInterval.getIdentifier());
        Assert.assertTrue(this.timing.forWaiting().awaitLatch(countDownLatch));
        Assert.assertNotNull(this.curator.checkExists().forPath(makePath));
        Assert.assertEquals(dataSegmentWithInterval, ((SegmentChangeRequestLoad) this.jsonMapper.readValue((byte[]) ((GetDataWatchBackgroundStatable) this.curator.getData().decompressed()).forPath(makePath), DataSegmentChangeRequest.class)).getSegment());
        Assert.assertTrue(this.timing.forWaiting().awaitLatch(countDownLatch2));
        Assert.assertEquals(0L, this.loadQueuePeon.getSegmentsToLoad().size());
        Assert.assertEquals(0L, this.loadQueuePeon.getLoadQueueSize());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DataSegment dataSegmentWithInterval(String str) {
        return DataSegment.builder().dataSource("test_load_queue_peon").interval(Intervals.of(str)).loadSpec(ImmutableMap.of()).version("2015-05-27T03:38:35.683Z").dimensions(ImmutableList.of()).metrics(ImmutableList.of()).shardSpec(NoneShardSpec.instance()).binaryVersion(9).size(1200L).build();
    }

    @After
    public void tearDown() throws Exception {
        this.loadQueueCache.close();
        this.loadQueuePeon.stop();
        tearDownServerAndCurator();
    }
}
