package org.apache.druid.tests.leadership;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.DruidClusterAdminClient;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.SqlTestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.testng.Assert;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;

@Guice(moduleFactory = DruidTestModuleFactory.class)
@Test(groups = {TestNGGroup.HIGH_AVAILABILTY})
/* loaded from: input_file:org/apache/druid/tests/leadership/ITHighAvailabilityTest.class */
public class ITHighAvailabilityTest {
    private static final String SYSTEM_QUERIES_RESOURCE = "/queries/high_availability_sys.json";
    private static final int NUM_LEADERSHIP_SWAPS = 3;
    private static final int NUM_RETRIES = 120;

    @Inject
    private IntegrationTestingConfig config;

    @Inject
    private DruidClusterAdminClient druidClusterAdminClient;

    @Inject
    ServerDiscoveryFactory factory;

    @Inject
    DruidNodeDiscoveryProvider druidNodeDiscovery;

    @Inject
    CoordinatorResourceTestClient coordinatorClient;

    @Inject
    SqlTestQueryHelper queryHelper;

    @Inject
    ObjectMapper jsonMapper;

    @Inject
    @TestClient
    HttpClient httpClient;
    private static final Logger LOG = new Logger(ITHighAvailabilityTest.class);
    private static final long RETRY_DELAY = TimeUnit.SECONDS.toMillis(5);

    @Test
    public void testLeadershipChanges() throws Exception {
        int i;
        int i2 = 0;
        String str = null;
        String str2 = null;
        do {
            String leader = getLeader("coordinator");
            String leader2 = getLeader("indexer");
            Assert.assertNotEquals(str, leader);
            Assert.assertNotEquals(str2, leader2);
            str = leader;
            str2 = leader2;
            this.queryHelper.testQueriesFromString(fillTemplate(this.config, AbstractIndexerTest.getResourceAsString(SYSTEM_QUERIES_RESOURCE), leader2, leader));
            swapLeadersAndWait(leader, leader2);
            i = i2;
            i2++;
        } while (i < NUM_LEADERSHIP_SWAPS);
    }

    @Test
    public void testDiscoveryAndSelfDiscovery() {
        ITRetryUtil.retryUntil(() -> {
            try {
                int i = 0;
                Iterator it = ImmutableList.of(this.druidNodeDiscovery.getForNodeRole(NodeRole.COORDINATOR), this.druidNodeDiscovery.getForNodeRole(NodeRole.OVERLORD), this.druidNodeDiscovery.getForNodeRole(NodeRole.HISTORICAL), this.druidNodeDiscovery.getForNodeRole(NodeRole.MIDDLE_MANAGER), this.druidNodeDiscovery.getForNodeRole(NodeRole.INDEXER), this.druidNodeDiscovery.getForNodeRole(NodeRole.BROKER), this.druidNodeDiscovery.getForNodeRole(NodeRole.ROUTER)).iterator();
                while (it.hasNext()) {
                    i += testSelfDiscovery(((DruidNodeDiscovery) it.next()).getAllNodes());
                }
                return Boolean.valueOf(i > 5);
            } catch (Throwable th) {
                return false;
            }
        }, true, RETRY_DELAY, NUM_RETRIES, "Standard services discovered");
    }

    @Test
    public void testCustomDiscovery() {
        ITRetryUtil.retryUntil(() -> {
            try {
                return Boolean.valueOf(testSelfDiscovery(this.druidNodeDiscovery.getForNodeRole(new NodeRole("custom-node-role")).getAllNodes()) > 0);
            } catch (Throwable th) {
                return false;
            }
        }, true, RETRY_DELAY, NUM_RETRIES, "Custom service discovered");
    }

    private int testSelfDiscovery(Collection<DiscoveryDruidNode> collection) throws MalformedURLException, ExecutionException, InterruptedException {
        int i = 0;
        for (DiscoveryDruidNode discoveryDruidNode : collection) {
            Object[] objArr = new Object[2];
            objArr[0] = this.config.isDocker() ? this.config.getDockerHost() : discoveryDruidNode.getDruidNode().getHost();
            objArr[1] = Integer.valueOf(discoveryDruidNode.getDruidNode().getPlaintextPort());
            String format = StringUtils.format("http://%s:%s/status/selfDiscovered", objArr);
            LOG.info("testing self discovery %s", new Object[]{format});
            StatusResponseHolder statusResponseHolder = (StatusResponseHolder) this.httpClient.go(new Request(HttpMethod.GET, new URL(format)), StatusResponseHandler.getInstance()).get();
            LOG.info("%s responded with %s", new Object[]{format, Integer.valueOf(statusResponseHolder.getStatus().getCode())});
            Assert.assertEquals(statusResponseHolder.getStatus(), HttpResponseStatus.OK);
            i++;
        }
        return i;
    }

    private void swapLeadersAndWait(String str, String str2) {
        Runnable runnable;
        Runnable runnable2;
        if (isCoordinatorOneLeader(this.config, str)) {
            this.druidClusterAdminClient.restartCoordinatorContainer();
            runnable = () -> {
                this.druidClusterAdminClient.waitUntilCoordinatorReady();
            };
        } else {
            this.druidClusterAdminClient.restartCoordinatorTwoContainer();
            runnable = () -> {
                this.druidClusterAdminClient.waitUntilCoordinatorTwoReady();
            };
        }
        if (isOverlordOneLeader(this.config, str2)) {
            this.druidClusterAdminClient.restartOverlordContainer();
            runnable2 = () -> {
                this.druidClusterAdminClient.waitUntilIndexerReady();
            };
        } else {
            this.druidClusterAdminClient.restartOverlordTwoContainer();
            runnable2 = () -> {
                this.druidClusterAdminClient.waitUntilOverlordTwoReady();
            };
        }
        runnable.run();
        runnable2.run();
    }

    private String getLeader(String str) {
        try {
            StatusResponseHolder statusResponseHolder = (StatusResponseHolder) this.httpClient.go(new Request(HttpMethod.GET, new URL(StringUtils.format("%s/druid/%s/v1/leader", new Object[]{this.config.getRouterUrl(), str}))), StatusResponseHandler.getInstance()).get();
            if (statusResponseHolder.getStatus().equals(HttpResponseStatus.OK)) {
                return statusResponseHolder.getContent();
            }
            throw new ISE("Error while fetching leader from[%s] status[%s] content[%s]", new Object[]{this.config.getRouterUrl(), statusResponseHolder.getStatus(), statusResponseHolder.getContent()});
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static String fillTemplate(IntegrationTestingConfig integrationTestingConfig, String str, String str2, String str3) {
        String replace = StringUtils.replace(StringUtils.replace(StringUtils.replace(StringUtils.replace(StringUtils.replace(StringUtils.replace(str, "%%OVERLORD_ONE%%", integrationTestingConfig.getOverlordInternalHost()), "%%OVERLORD_TWO%%", integrationTestingConfig.getOverlordTwoInternalHost()), "%%COORDINATOR_ONE%%", integrationTestingConfig.getCoordinatorInternalHost()), "%%COORDINATOR_TWO%%", integrationTestingConfig.getCoordinatorTwoInternalHost()), "%%BROKER%%", integrationTestingConfig.getBrokerInternalHost()), "%%ROUTER%%", integrationTestingConfig.getRouterInternalHost());
        String replace2 = isOverlordOneLeader(integrationTestingConfig, str2) ? StringUtils.replace(StringUtils.replace(replace, "%%OVERLORD_ONE_LEADER%%", "1"), "%%OVERLORD_TWO_LEADER%%", "0") : StringUtils.replace(StringUtils.replace(replace, "%%OVERLORD_ONE_LEADER%%", "0"), "%%OVERLORD_TWO_LEADER%%", "1");
        return StringUtils.replace(isCoordinatorOneLeader(integrationTestingConfig, str3) ? StringUtils.replace(StringUtils.replace(replace2, "%%COORDINATOR_ONE_LEADER%%", "1"), "%%COORDINATOR_TWO_LEADER%%", "0") : StringUtils.replace(StringUtils.replace(replace2, "%%COORDINATOR_ONE_LEADER%%", "0"), "%%COORDINATOR_TWO_LEADER%%", "1"), "%%NON_LEADER%%", String.valueOf(NullHandling.defaultLongValue()));
    }

    private static boolean isCoordinatorOneLeader(IntegrationTestingConfig integrationTestingConfig, String str) {
        return str.contains(transformHost(integrationTestingConfig.getCoordinatorInternalHost()));
    }

    private static boolean isOverlordOneLeader(IntegrationTestingConfig integrationTestingConfig, String str) {
        return str.contains(transformHost(integrationTestingConfig.getOverlordInternalHost()));
    }

    private static String transformHost(String str) {
        return StringUtils.format("%s:", new Object[]{str});
    }
}
