package org.apache.druid.client;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.druid.client.ServerView;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/client/HttpServerInventoryViewTest.class */
public class HttpServerInventoryViewTest {
    private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
    private static final TypeReference<ChangeRequestsSnapshot<DataSegmentChangeRequest>> TYPE_REF = HttpServerInventoryView.SEGMENT_LIST_RESP_TYPE_REF;
    private static final String EXEC_NAME_PREFIX = "InventoryViewTest";
    private static final String METRIC_SUCCESS = "serverview/sync/healthy";
    private static final String METRIC_UNSTABLE_TIME = "serverview/sync/unstableTime";
    private StubServiceEmitter serviceEmitter;
    private HttpServerInventoryView httpServerInventoryView;
    private TestChangeRequestHttpClient<ChangeRequestsSnapshot<DataSegmentChangeRequest>> httpClient;
    private TestExecutorFactory execHelper;
    private TestDruidNodeDiscovery druidNodeDiscovery;
    private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
    private Map<DruidServerMetadata, Set<DataSegment>> segmentsAddedToView;
    private Map<DruidServerMetadata, Set<DataSegment>> segmentsRemovedFromView;
    private Set<DruidServerMetadata> removedServers;
    private AtomicBoolean inventoryInitialized;

    /* loaded from: input_file:org/apache/druid/client/HttpServerInventoryViewTest$TestDruidNodeDiscovery.class */
    private static class TestDruidNodeDiscovery implements DruidNodeDiscovery {
        DruidNodeDiscovery.Listener listener;

        private TestDruidNodeDiscovery() {
        }

        public Collection<DiscoveryDruidNode> getAllNodes() {
            throw new UnsupportedOperationException("Not Implemented.");
        }

        public void registerListener(DruidNodeDiscovery.Listener listener) {
            this.listener = listener;
        }

        void markNodeViewInitialized() {
            this.listener.nodeViewInitialized();
        }

        DiscoveryDruidNode addNodeAndNotifyListeners(String str) {
            DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(new DruidNode("druid/historical", str, false, 8080, (Integer) null, true, false), NodeRole.HISTORICAL, ImmutableMap.of("dataNodeService", new DataNodeService("tier", 10737418240L, ServerType.HISTORICAL, 0)));
            this.listener.nodesAdded(ImmutableList.of(discoveryDruidNode));
            return discoveryDruidNode;
        }

        void removeNodesAndNotifyListeners(DiscoveryDruidNode... discoveryDruidNodeArr) {
            this.listener.nodesRemoved(Arrays.asList(discoveryDruidNodeArr));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/client/HttpServerInventoryViewTest$TestExecutorFactory.class */
    public static class TestExecutorFactory implements ScheduledExecutorFactory {
        private BlockingExecutorService syncExecutor;
        private BlockingExecutorService monitorExecutor;

        private TestExecutorFactory() {
        }

        public ScheduledExecutorService create(int i, String str) {
            BlockingExecutorService blockingExecutorService = new BlockingExecutorService(str);
            if ("InventoryViewTest-%s".equals(str)) {
                this.syncExecutor = blockingExecutorService;
            } else if ("InventoryViewTest-monitor-%s".equals(str)) {
                this.monitorExecutor = blockingExecutorService;
            }
            return new WrappingScheduledExecutorService(str, blockingExecutorService, false);
        }

        void sendSyncRequestAndHandleResponse() {
            this.syncExecutor.finishNextPendingTasks(2);
        }

        void sendSyncRequest() {
            this.syncExecutor.finishNextPendingTask();
        }

        void finishInventoryInitialization() {
            this.syncExecutor.finishNextPendingTask();
        }

        void emitMetrics() {
            this.monitorExecutor.finishNextPendingTasks(2);
        }
    }

    @Before
    public void setup() {
        this.serviceEmitter = new StubServiceEmitter("test", "localhost");
        EmittingLogger.registerEmitter(this.serviceEmitter);
        this.druidNodeDiscovery = new TestDruidNodeDiscovery();
        this.druidNodeDiscoveryProvider = (DruidNodeDiscoveryProvider) EasyMock.createMock(DruidNodeDiscoveryProvider.class);
        EasyMock.expect(this.druidNodeDiscoveryProvider.getForService("dataNodeService")).andReturn(this.druidNodeDiscovery);
        EasyMock.replay(new Object[]{this.druidNodeDiscoveryProvider});
        this.httpClient = new TestChangeRequestHttpClient<>(TYPE_REF, MAPPER);
        this.execHelper = new TestExecutorFactory();
        this.inventoryInitialized = new AtomicBoolean(false);
        this.segmentsAddedToView = new HashMap();
        this.segmentsRemovedFromView = new HashMap();
        this.removedServers = new HashSet();
        createInventoryView(new HttpServerInventoryViewConfig((Period) null, (Period) null, (Integer) null));
    }

    @After
    public void tearDown() {
        EasyMock.verify(new Object[]{this.druidNodeDiscoveryProvider});
        if (this.httpServerInventoryView == null || !this.httpServerInventoryView.isStarted()) {
            return;
        }
        this.httpServerInventoryView.stop();
    }

    @Test
    public void testInitHappensAfterNodeViewInit() {
        this.httpServerInventoryView.start();
        Assert.assertTrue(this.httpServerInventoryView.isStarted());
        Assert.assertFalse(this.inventoryInitialized.get());
        this.druidNodeDiscovery.markNodeViewInitialized();
        Assert.assertFalse(this.inventoryInitialized.get());
        this.execHelper.finishInventoryInitialization();
        Assert.assertTrue(this.inventoryInitialized.get());
        this.httpServerInventoryView.stop();
    }

    @Test
    public void testStopShutsDownExecutors() {
        this.httpServerInventoryView.start();
        Assert.assertFalse(this.execHelper.syncExecutor.isShutdown());
        this.httpServerInventoryView.stop();
        Assert.assertTrue(this.execHelper.syncExecutor.isShutdown());
    }

    @Test
    public void testAddNodeStartsSync() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        this.execHelper.finishInventoryInitialization();
        DruidServer druidServer = this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost").toDruidServer();
        Collection inventory = this.httpServerInventoryView.getInventory();
        Assert.assertEquals(1L, inventory.size());
        Assert.assertTrue(inventory.contains(druidServer));
        this.execHelper.emitMetrics();
        this.serviceEmitter.verifyValue(METRIC_SUCCESS, 1);
        this.serviceEmitter.verifyNotEmitted(METRIC_UNSTABLE_TIME);
        DataSegment dataSegment = CreateDataSegments.ofDatasource("wiki").eachOfSizeInMb(500L).get(0);
        this.httpClient.completeNextRequestWith((TestChangeRequestHttpClient<ChangeRequestsSnapshot<DataSegmentChangeRequest>>) snapshotOf(new SegmentChangeRequestLoad(dataSegment)));
        this.execHelper.sendSyncRequestAndHandleResponse();
        DruidServer inventoryValue = this.httpServerInventoryView.getInventoryValue(druidServer.getName());
        Assert.assertNotNull(inventoryValue);
        Assert.assertEquals(1L, inventoryValue.getTotalSegments());
        Assert.assertNotNull(inventoryValue.getSegment(dataSegment.getId()));
        this.httpServerInventoryView.stop();
    }

    @Test
    public void testRemoveNodeStopsSync() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        this.execHelper.finishInventoryInitialization();
        DiscoveryDruidNode addNodeAndNotifyListeners = this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        DruidServer druidServer = addNodeAndNotifyListeners.toDruidServer();
        this.druidNodeDiscovery.removeNodesAndNotifyListeners(addNodeAndNotifyListeners);
        Assert.assertNull(this.httpServerInventoryView.getInventoryValue(druidServer.getName()));
        this.execHelper.emitMetrics();
        this.serviceEmitter.verifyNotEmitted(METRIC_SUCCESS);
        this.serviceEmitter.verifyNotEmitted(METRIC_UNSTABLE_TIME);
        this.httpServerInventoryView.stop();
    }

    @Test(timeout = 60000)
    public void testSyncSegmentLoadAndDrop() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        this.execHelper.finishInventoryInitialization();
        DiscoveryDruidNode addNodeAndNotifyListeners = this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        DruidServer druidServer = addNodeAndNotifyListeners.toDruidServer();
        DataSegment[] dataSegmentArr = (DataSegment[]) CreateDataSegments.ofDatasource("wiki").forIntervals(4, Granularities.DAY).eachOfSizeInMb(500L).toArray(new DataSegment[0]);
        this.httpClient.completeNextRequestWith((TestChangeRequestHttpClient<ChangeRequestsSnapshot<DataSegmentChangeRequest>>) snapshotOf(new SegmentChangeRequestLoad(dataSegmentArr[0])));
        this.execHelper.sendSyncRequestAndHandleResponse();
        Assert.assertTrue(isAddedToView(druidServer, dataSegmentArr[0]));
        resetForNextSyncRequest();
        this.httpClient.completeNextRequestWith((TestChangeRequestHttpClient<ChangeRequestsSnapshot<DataSegmentChangeRequest>>) snapshotOf(new SegmentChangeRequestDrop(dataSegmentArr[0]), new SegmentChangeRequestLoad(dataSegmentArr[1]), new SegmentChangeRequestLoad(dataSegmentArr[2])));
        this.execHelper.sendSyncRequestAndHandleResponse();
        Assert.assertTrue(isRemovedFromView(druidServer, dataSegmentArr[0]));
        Assert.assertTrue(isAddedToView(druidServer, dataSegmentArr[1]));
        Assert.assertTrue(isAddedToView(druidServer, dataSegmentArr[2]));
        resetForNextSyncRequest();
        this.httpClient.completeNextRequestWith((TestChangeRequestHttpClient<ChangeRequestsSnapshot<DataSegmentChangeRequest>>) new ChangeRequestsSnapshot<>(true, "Server requested reset", ChangeRequestHistory.Counter.ZERO, Collections.emptyList()));
        this.execHelper.sendSyncRequestAndHandleResponse();
        Assert.assertTrue(this.segmentsAddedToView.isEmpty());
        Assert.assertTrue(this.segmentsRemovedFromView.isEmpty());
        resetForNextSyncRequest();
        this.httpClient.completeNextRequestWith((TestChangeRequestHttpClient<ChangeRequestsSnapshot<DataSegmentChangeRequest>>) snapshotOf(new SegmentChangeRequestLoad(dataSegmentArr[2]), new SegmentChangeRequestLoad(dataSegmentArr[3])));
        this.execHelper.sendSyncRequestAndHandleResponse();
        Assert.assertTrue(isRemovedFromView(druidServer, dataSegmentArr[1]));
        Assert.assertTrue(isAddedToView(druidServer, dataSegmentArr[3]));
        DruidServer inventoryValue = this.httpServerInventoryView.getInventoryValue(druidServer.getName());
        Assert.assertNotNull(inventoryValue);
        Assert.assertEquals(2L, inventoryValue.getTotalSegments());
        Assert.assertNotNull(inventoryValue.getSegment(dataSegmentArr[2].getId()));
        Assert.assertNotNull(inventoryValue.getSegment(dataSegmentArr[3].getId()));
        this.druidNodeDiscovery.removeNodesAndNotifyListeners(addNodeAndNotifyListeners);
        this.druidNodeDiscovery.removeNodesAndNotifyListeners(new DiscoveryDruidNode(new DruidNode("service", "host", false, 8080, (Integer) null, true, false), NodeRole.INDEXER, Collections.emptyMap()));
        this.druidNodeDiscovery.removeNodesAndNotifyListeners(new DiscoveryDruidNode(new DruidNode("service", "host", false, 8080, (Integer) null, true, false), NodeRole.INDEXER, ImmutableMap.of("dataNodeService", new LookupNodeService("lookyloo"))));
        Assert.assertTrue(this.removedServers.contains(druidServer.getMetadata()));
        Assert.assertNull(this.httpServerInventoryView.getInventoryValue(druidServer.getName()));
        this.httpServerInventoryView.stop();
    }

    @Test
    public void testSyncWhenRequestFailedToSend() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        this.execHelper.finishInventoryInitialization();
        this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        this.httpClient.failToSendNextRequestWith(new ISE("Could not send request to server", new Object[0]));
        this.execHelper.sendSyncRequest();
        this.serviceEmitter.flush();
        this.execHelper.emitMetrics();
        this.serviceEmitter.verifyValue(METRIC_SUCCESS, 0);
        this.httpServerInventoryView.stop();
    }

    @Test
    public void testSyncWhenErrorResponse() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        this.execHelper.finishInventoryInitialization();
        this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        this.httpClient.completeNextRequestWith(InvalidInput.exception("failure on server", new Object[0]));
        this.execHelper.sendSyncRequestAndHandleResponse();
        this.serviceEmitter.flush();
        this.execHelper.emitMetrics();
        this.serviceEmitter.verifyValue(METRIC_SUCCESS, 0);
        this.httpServerInventoryView.stop();
    }

    @Test
    public void testUnstableServerAlertsAfterTimeout() {
        createInventoryView(new HttpServerInventoryViewConfig((Period) null, Period.millis(0), (Integer) null));
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        this.execHelper.finishInventoryInitialization();
        this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        this.serviceEmitter.flush();
        this.httpClient.completeNextRequestWith(InvalidInput.exception("failure on server", new Object[0]));
        this.execHelper.sendSyncRequestAndHandleResponse();
        List alerts = this.serviceEmitter.getAlerts();
        Assert.assertEquals(1L, alerts.size());
        Assert.assertTrue(((AlertEvent) alerts.get(0)).getDescription().contains("Sync failed for server"));
        this.serviceEmitter.flush();
        this.execHelper.emitMetrics();
        this.serviceEmitter.verifyValue(METRIC_SUCCESS, 0);
        this.httpServerInventoryView.stop();
    }

    @Test(timeout = 60000)
    public void testInitWaitsForServerToSync() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        ExecutorService singleThreaded = Execs.singleThreaded("InventoryViewTest-init");
        try {
            try {
                singleThreaded.submit(() -> {
                    this.execHelper.finishInventoryInitialization();
                });
                Thread.sleep(1000L);
                Assert.assertFalse(this.inventoryInitialized.get());
                this.httpClient.completeNextRequestWith((TestChangeRequestHttpClient<ChangeRequestsSnapshot<DataSegmentChangeRequest>>) snapshotOf(new DataSegmentChangeRequest[0]));
                this.execHelper.sendSyncRequestAndHandleResponse();
                Thread.sleep(10000L);
                Assert.assertTrue(this.inventoryInitialized.get());
                singleThreaded.shutdownNow();
            } catch (InterruptedException e) {
                throw new ISE(e, "Interrupted", new Object[0]);
            }
        } catch (Throwable th) {
            singleThreaded.shutdownNow();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testInitDoesNotWaitForRemovedServerToSync() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        DiscoveryDruidNode addNodeAndNotifyListeners = this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        ExecutorService singleThreaded = Execs.singleThreaded("InventoryViewTest-init");
        try {
            try {
                singleThreaded.submit(() -> {
                    this.execHelper.finishInventoryInitialization();
                });
                Thread.sleep(1000L);
                Assert.assertFalse(this.inventoryInitialized.get());
                this.druidNodeDiscovery.removeNodesAndNotifyListeners(addNodeAndNotifyListeners);
                Thread.sleep(10000L);
                Assert.assertTrue(this.inventoryInitialized.get());
                singleThreaded.shutdownNow();
            } catch (InterruptedException e) {
                throw new ISE(e, "Interrupted", new Object[0]);
            }
        } catch (Throwable th) {
            singleThreaded.shutdownNow();
            throw th;
        }
    }

    private void createInventoryView(HttpServerInventoryViewConfig httpServerInventoryViewConfig) {
        this.httpServerInventoryView = new HttpServerInventoryView(MAPPER, this.httpClient, this.druidNodeDiscoveryProvider, pair -> {
            return !((DataSegment) pair.rhs).getDataSource().equals("non-loading-datasource");
        }, httpServerInventoryViewConfig, this.serviceEmitter, this.execHelper, EXEC_NAME_PREFIX);
        this.httpServerInventoryView.registerSegmentCallback(Execs.directExecutor(), new ServerView.SegmentCallback() { // from class: org.apache.druid.client.HttpServerInventoryViewTest.1
            public ServerView.CallbackAction segmentAdded(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                ((Set) HttpServerInventoryViewTest.this.segmentsAddedToView.computeIfAbsent(druidServerMetadata, druidServerMetadata2 -> {
                    return new HashSet();
                })).add(dataSegment);
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentRemoved(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                ((Set) HttpServerInventoryViewTest.this.segmentsRemovedFromView.computeIfAbsent(druidServerMetadata, druidServerMetadata2 -> {
                    return new HashSet();
                })).add(dataSegment);
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentViewInitialized() {
                HttpServerInventoryViewTest.this.inventoryInitialized.set(true);
                return ServerView.CallbackAction.CONTINUE;
            }
        });
        this.httpServerInventoryView.registerServerRemovedCallback(Execs.directExecutor(), druidServer -> {
            this.removedServers.add(druidServer.getMetadata());
            return ServerView.CallbackAction.CONTINUE;
        });
    }

    private boolean isAddedToView(DruidServer druidServer, DataSegment dataSegment) {
        return this.segmentsAddedToView.getOrDefault(druidServer.getMetadata(), Collections.emptySet()).contains(dataSegment);
    }

    private boolean isRemovedFromView(DruidServer druidServer, DataSegment dataSegment) {
        return this.segmentsRemovedFromView.getOrDefault(druidServer.getMetadata(), Collections.emptySet()).contains(dataSegment);
    }

    private void resetForNextSyncRequest() {
        this.segmentsAddedToView.clear();
        this.segmentsRemovedFromView.clear();
    }

    private static ChangeRequestsSnapshot<DataSegmentChangeRequest> snapshotOf(DataSegmentChangeRequest... dataSegmentChangeRequestArr) {
        return ChangeRequestsSnapshot.success(ChangeRequestHistory.Counter.ZERO, Arrays.asList(dataSegmentChangeRequestArr));
    }
}
