package org.apache.druid.server.coordinator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.api.ChildrenDeletable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.BatchServerInventoryView;
import org.apache.druid.client.CoordinatorSegmentWatcherConfig;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.curator.discovery.NoopServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.DeadlockDetectingTimeout;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

/* loaded from: input_file:org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.class */
public class CuratorDruidCoordinatorTest extends CuratorTestBase {
    private DruidCoordinator coordinator;
    private SegmentsMetadataManager segmentsMetadataManager;
    private DataSourcesSnapshot dataSourcesSnapshot;
    private DruidCoordinatorRuntimeParams coordinatorRuntimeParams;
    private ScheduledExecutorFactory scheduledExecutorFactory;
    private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons;
    private LoadQueuePeon sourceLoadQueuePeon;
    private LoadQueuePeon destinationLoadQueuePeon;
    private MetadataRuleManager metadataRuleManager;
    private CountDownLatch leaderAnnouncerLatch;
    private CountDownLatch leaderUnannouncerLatch;
    private PathChildrenCache sourceLoadQueueChildrenCache;
    private PathChildrenCache destinationLoadQueueChildrenCache;
    private DruidCoordinatorConfig druidCoordinatorConfig;
    private ObjectMapper objectMapper;
    private JacksonConfigManager configManager;
    private DruidNode druidNode;
    private static final String SEGPATH = "/druid/segments";
    private static final String SOURCE_LOAD_PATH = "/druid/loadQueue/localhost:1";
    private static final String DESTINATION_LOAD_PATH = "/druid/loadQueue/localhost:2";
    private static final long COORDINATOR_START_DELAY = 1;
    private static final long COORDINATOR_PERIOD = 100;
    private BatchServerInventoryView baseView;
    private CoordinatorServerView serverView;
    private CountDownLatch segmentViewInitLatch;
    private volatile CountDownLatch segmentAddedLatch;
    private volatile CountDownLatch segmentRemovedLatch;
    private ScheduledExecutorService peonExec = Execs.scheduledSingleThreaded("Master-PeonExec--%d");
    private ExecutorService callbackExec = Execs.multiThreaded(4, "LoadQueuePeon-callbackexec--%d");

    @Rule
    public final TestRule timeout = new DeadlockDetectingTimeout(60, TimeUnit.SECONDS);
    private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
    private final ZkPathsConfig zkPathsConfig = new ZkPathsConfig();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest$TestDruidLeaderSelector.class */
    public static class TestDruidLeaderSelector implements DruidLeaderSelector {
        private volatile DruidLeaderSelector.Listener listener;
        private volatile String leader;

        private TestDruidLeaderSelector() {
        }

        public String getCurrentLeader() {
            return this.leader;
        }

        public boolean isLeader() {
            return this.leader != null;
        }

        public int localTerm() {
            return 0;
        }

        public void registerListener(DruidLeaderSelector.Listener listener) {
            this.listener = listener;
            this.leader = "what:1234";
            listener.becomeLeader();
        }

        public void unregisterListener() {
            this.leader = null;
            this.listener.stopBeingLeader();
        }
    }

    @Before
    public void setUp() throws Exception {
        this.segmentsMetadataManager = (SegmentsMetadataManager) EasyMock.createNiceMock(SegmentsMetadataManager.class);
        this.dataSourcesSnapshot = (DataSourcesSnapshot) EasyMock.createNiceMock(DataSourcesSnapshot.class);
        this.coordinatorRuntimeParams = (DruidCoordinatorRuntimeParams) EasyMock.createNiceMock(DruidCoordinatorRuntimeParams.class);
        this.metadataRuleManager = (MetadataRuleManager) EasyMock.createNiceMock(MetadataRuleManager.class);
        this.configManager = (JacksonConfigManager) EasyMock.createNiceMock(JacksonConfigManager.class);
        EasyMock.expect(this.configManager.watch((String) EasyMock.eq("coordinator.config"), (Class) EasyMock.anyObject(Class.class), EasyMock.anyObject())).andReturn(new AtomicReference(CoordinatorDynamicConfig.builder().build())).anyTimes();
        EasyMock.expect(this.configManager.watch((String) EasyMock.eq("coordinator.compaction.config"), (Class) EasyMock.anyObject(Class.class), EasyMock.anyObject())).andReturn(new AtomicReference(CoordinatorCompactionConfig.empty())).anyTimes();
        EasyMock.replay(new Object[]{this.configManager});
        setupServerAndCurator();
        this.curator.start();
        this.curator.blockUntilConnected();
        this.curator.create().creatingParentsIfNeeded().forPath(SEGPATH);
        this.curator.create().creatingParentsIfNeeded().forPath(SOURCE_LOAD_PATH);
        this.curator.create().creatingParentsIfNeeded().forPath(DESTINATION_LOAD_PATH);
        this.objectMapper = new DefaultObjectMapper();
        this.druidCoordinatorConfig = new TestDruidCoordinatorConfig(new Duration(COORDINATOR_START_DELAY), new Duration(COORDINATOR_PERIOD), null, null, new Duration(COORDINATOR_PERIOD), null, 10, new Duration("PT0s"));
        this.sourceLoadQueueChildrenCache = new PathChildrenCache(this.curator, SOURCE_LOAD_PATH, true, true, Execs.singleThreaded("coordinator_test_path_children_cache_src-%d"));
        this.destinationLoadQueueChildrenCache = new PathChildrenCache(this.curator, DESTINATION_LOAD_PATH, true, true, Execs.singleThreaded("coordinator_test_path_children_cache_dest-%d"));
        this.sourceLoadQueuePeon = new CuratorLoadQueuePeon(this.curator, SOURCE_LOAD_PATH, this.objectMapper, this.peonExec, this.callbackExec, this.druidCoordinatorConfig);
        this.destinationLoadQueuePeon = new CuratorLoadQueuePeon(this.curator, DESTINATION_LOAD_PATH, this.objectMapper, this.peonExec, this.callbackExec, this.druidCoordinatorConfig);
        this.druidNode = new DruidNode("hey", "what", false, 1234, (Integer) null, true, false);
        this.loadManagementPeons = new ConcurrentHashMap();
        this.scheduledExecutorFactory = (i, str) -> {
            return Executors.newSingleThreadScheduledExecutor();
        };
        this.leaderAnnouncerLatch = new CountDownLatch(1);
        this.leaderUnannouncerLatch = new CountDownLatch(1);
        this.coordinator = new DruidCoordinator(this.druidCoordinatorConfig, new ZkPathsConfig() { // from class: org.apache.druid.server.coordinator.CuratorDruidCoordinatorTest.1
            public String getBase() {
                return "druid";
            }
        }, this.configManager, this.segmentsMetadataManager, this.baseView, this.metadataRuleManager, this.curator, new NoopServiceEmitter(), this.scheduledExecutorFactory, (IndexingServiceClient) null, (LoadQueueTaskMaster) null, new NoopServiceAnnouncer() { // from class: org.apache.druid.server.coordinator.CuratorDruidCoordinatorTest.2
            public void announce(DruidNode druidNode) {
                CuratorDruidCoordinatorTest.this.leaderAnnouncerLatch.countDown();
            }

            public void unannounce(DruidNode druidNode) {
                CuratorDruidCoordinatorTest.this.leaderUnannouncerLatch.countDown();
            }
        }, this.druidNode, this.loadManagementPeons, (Set) null, new CostBalancerStrategyFactory(), (LookupCoordinatorManager) EasyMock.createNiceMock(LookupCoordinatorManager.class), new TestDruidLeaderSelector(), (CompactSegments) null);
    }

    @After
    public void tearDown() throws Exception {
        this.baseView.stop();
        this.sourceLoadQueuePeon.stop();
        this.sourceLoadQueueChildrenCache.close();
        this.destinationLoadQueueChildrenCache.close();
        tearDownServerAndCurator();
    }

    @Test
    public void testStopDoesntKillPoolItDoesntOwn() throws Exception {
        setupView();
        this.sourceLoadQueuePeon.stop();
        Assert.assertFalse(this.peonExec.isShutdown());
        Assert.assertFalse(this.callbackExec.isShutdown());
    }

    @Test
    public void testMoveSegment() throws Exception {
        this.segmentViewInitLatch = new CountDownLatch(1);
        this.segmentAddedLatch = new CountDownLatch(4);
        this.segmentRemovedLatch = new CountDownLatch(0);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        setupView();
        DruidServer druidServer = new DruidServer("localhost:1", "localhost:1", (String) null, 10000000L, ServerType.HISTORICAL, "default_tier", 0);
        DruidServer druidServer2 = new DruidServer("localhost:2", "localhost:2", (String) null, 10000000L, ServerType.HISTORICAL, "default_tier", 0);
        setupZNodeForServer(druidServer, this.zkPathsConfig, this.jsonMapper);
        setupZNodeForServer(druidServer2, this.zkPathsConfig, this.jsonMapper);
        List transform = Lists.transform(ImmutableList.of(Pair.of("2011-04-01/2011-04-03", "v1"), Pair.of("2011-04-03/2011-04-06", "v1"), Pair.of("2011-04-06/2011-04-09", "v1")), pair -> {
            return dataSegmentWithIntervalAndVersion((String) pair.lhs, (String) pair.rhs);
        });
        List transform2 = Lists.transform(ImmutableList.of(Pair.of("2011-03-31/2011-04-01", "v1")), pair2 -> {
            return dataSegmentWithIntervalAndVersion((String) pair2.lhs, (String) pair2.rhs);
        });
        DataSegment dataSegment = (DataSegment) transform.get(2);
        ArrayList arrayList = new ArrayList();
        Iterator it = transform.iterator();
        while (it.hasNext()) {
            arrayList.add(announceBatchSegmentsForServer(druidServer, ImmutableSet.of((DataSegment) it.next()), this.zkPathsConfig, this.jsonMapper));
        }
        Iterator it2 = transform2.iterator();
        while (it2.hasNext()) {
            announceBatchSegmentsForServer(druidServer2, ImmutableSet.of((DataSegment) it2.next()), this.zkPathsConfig, this.jsonMapper);
        }
        Assert.assertTrue(this.timing.forWaiting().awaitLatch(this.segmentViewInitLatch));
        Assert.assertTrue(this.timing.forWaiting().awaitLatch(this.segmentAddedLatch));
        this.sourceLoadQueueChildrenCache.getListenable().addListener((curatorFramework, pathChildrenCacheEvent) -> {
            if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
                countDownLatch2.countDown();
            } else if (CuratorUtils.isChildAdded(pathChildrenCacheEvent)) {
                unannounceSegmentFromBatchForServer(druidServer, dataSegment, (String) arrayList.get(2), this.zkPathsConfig);
            }
        });
        this.destinationLoadQueueChildrenCache.getListenable().addListener((curatorFramework2, pathChildrenCacheEvent2) -> {
            if (pathChildrenCacheEvent2.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
                countDownLatch.countDown();
            } else if (CuratorUtils.isChildAdded(pathChildrenCacheEvent2)) {
                announceBatchSegmentsForServer(druidServer2, ImmutableSet.of(dataSegment), this.zkPathsConfig, this.jsonMapper);
            }
        });
        this.sourceLoadQueueChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        this.destinationLoadQueueChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        Assert.assertTrue(this.timing.forWaiting().awaitLatch(countDownLatch2));
        Assert.assertTrue(this.timing.forWaiting().awaitLatch(countDownLatch));
        this.loadManagementPeons.put("localhost:1", this.sourceLoadQueuePeon);
        this.loadManagementPeons.put("localhost:2", this.destinationLoadQueuePeon);
        this.segmentRemovedLatch = new CountDownLatch(1);
        this.segmentAddedLatch = new CountDownLatch(1);
        ImmutableDruidDataSource immutableDruidDataSource = (ImmutableDruidDataSource) EasyMock.createNiceMock(ImmutableDruidDataSource.class);
        EasyMock.expect(immutableDruidDataSource.getSegment((SegmentId) EasyMock.anyObject(SegmentId.class))).andReturn(transform.get(2));
        EasyMock.replay(new Object[]{immutableDruidDataSource});
        EasyMock.expect(this.segmentsMetadataManager.getImmutableDataSourceWithUsedSegments(EasyMock.anyString())).andReturn(immutableDruidDataSource);
        EasyMock.expect(this.coordinatorRuntimeParams.getDataSourcesSnapshot()).andReturn(this.dataSourcesSnapshot).anyTimes();
        EasyMock.replay(new Object[]{this.segmentsMetadataManager, this.coordinatorRuntimeParams});
        EasyMock.expect(this.dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(immutableDruidDataSource).anyTimes();
        EasyMock.replay(new Object[]{this.dataSourcesSnapshot});
        this.coordinator.moveSegment(this.coordinatorRuntimeParams, druidServer.toImmutableDruidServer(), druidServer2.toImmutableDruidServer(), (DataSegment) transform.get(2), (LoadPeonCallback) null);
        Assert.assertTrue(this.timing.forWaiting().awaitLatch(this.segmentAddedLatch));
        ((ChildrenDeletable) this.curator.delete().guaranteed()).forPath(ZKPaths.makePath(DESTINATION_LOAD_PATH, dataSegment.getId().toString()));
        Assert.assertTrue(this.timing.forWaiting().awaitLatch(this.segmentRemovedLatch));
        ((ChildrenDeletable) this.curator.delete().guaranteed()).forPath(ZKPaths.makePath(SOURCE_LOAD_PATH, dataSegment.getId().toString()));
        ArrayList arrayList2 = new ArrayList(this.serverView.getInventory());
        Assert.assertEquals(2L, ((DruidServer) arrayList2.get(0)).getTotalSegments());
        Assert.assertEquals(2L, ((DruidServer) arrayList2.get(1)).getTotalSegments());
    }

    private void setupView() throws Exception {
        this.baseView = new BatchServerInventoryView(this.zkPathsConfig, this.curator, this.jsonMapper, Predicates.alwaysTrue()) { // from class: org.apache.druid.server.coordinator.CuratorDruidCoordinatorTest.3
            public void registerSegmentCallback(Executor executor, final ServerView.SegmentCallback segmentCallback) {
                super.registerSegmentCallback(executor, new ServerView.SegmentCallback() { // from class: org.apache.druid.server.coordinator.CuratorDruidCoordinatorTest.3.1
                    public ServerView.CallbackAction segmentAdded(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                        ServerView.CallbackAction segmentAdded = segmentCallback.segmentAdded(druidServerMetadata, dataSegment);
                        CuratorDruidCoordinatorTest.this.segmentAddedLatch.countDown();
                        return segmentAdded;
                    }

                    public ServerView.CallbackAction segmentRemoved(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                        ServerView.CallbackAction segmentRemoved = segmentCallback.segmentRemoved(druidServerMetadata, dataSegment);
                        CuratorDruidCoordinatorTest.this.segmentRemovedLatch.countDown();
                        return segmentRemoved;
                    }

                    public ServerView.CallbackAction segmentViewInitialized() {
                        ServerView.CallbackAction segmentViewInitialized = segmentCallback.segmentViewInitialized();
                        CuratorDruidCoordinatorTest.this.segmentViewInitLatch.countDown();
                        return segmentViewInitialized;
                    }
                });
            }
        };
        this.serverView = new CoordinatorServerView(this.baseView, new CoordinatorSegmentWatcherConfig());
        this.baseView.start();
        this.sourceLoadQueuePeon.start();
        this.destinationLoadQueuePeon.start();
        this.coordinator = new DruidCoordinator(this.druidCoordinatorConfig, new ZkPathsConfig() { // from class: org.apache.druid.server.coordinator.CuratorDruidCoordinatorTest.4
            public String getBase() {
                return "druid";
            }
        }, this.configManager, this.segmentsMetadataManager, this.baseView, this.metadataRuleManager, this.curator, new NoopServiceEmitter(), this.scheduledExecutorFactory, (IndexingServiceClient) null, (LoadQueueTaskMaster) null, new NoopServiceAnnouncer() { // from class: org.apache.druid.server.coordinator.CuratorDruidCoordinatorTest.5
            public void announce(DruidNode druidNode) {
                CuratorDruidCoordinatorTest.this.leaderAnnouncerLatch.countDown();
            }

            public void unannounce(DruidNode druidNode) {
                CuratorDruidCoordinatorTest.this.leaderUnannouncerLatch.countDown();
            }
        }, this.druidNode, this.loadManagementPeons, (Set) null, new CostBalancerStrategyFactory(), (LookupCoordinatorManager) EasyMock.createNiceMock(LookupCoordinatorManager.class), new TestDruidLeaderSelector(), (CompactSegments) null);
    }

    private DataSegment dataSegmentWithIntervalAndVersion(String str, String str2) {
        return DataSegment.builder().dataSource("test_curator_druid_coordinator").interval(Intervals.of(str)).loadSpec(ImmutableMap.of("type", "local", "path", "somewhere")).version(str2).dimensions(ImmutableList.of()).metrics(ImmutableList.of()).shardSpec(NoneShardSpec.instance()).binaryVersion(9).size(0L).build();
    }
}
