/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.functions.java;

import com.fasterxml.jackson.databind.MappingIterator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.FunctionStatusUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.tests.integration.containers.WorkerContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTest;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class PulsarWorkerRebalanceDrainTest
extends PulsarFunctionsTest {
    private static final Logger log = LoggerFactory.getLogger(PulsarWorkerRebalanceDrainTest.class);
    final String UrlProtocolPrefix = "http://";
    final String WorkerRebalanceUrlSuffix = "/admin/v2/worker/rebalance";
    final String WorkerDrainAtLeaderUrlSuffix = "/admin/v2/worker/leader/drain?workerId=";
    final int NumFunctionsAssignedOnEachWorker = 2;
    final int NumAdditionalWorkersAtSetup = 1;

    PulsarWorkerRebalanceDrainTest(FunctionRuntimeType functionRuntimeType) {
        super(functionRuntimeType);
    }

    @Override
    public void setupCluster() throws Exception {
        super.setupCluster();
        this.pulsarCluster.setupFunctionWorkers(PulsarWorkerRebalanceDrainTest.randomName(5), this.functionRuntimeType, 1);
        log.debug("PulsarWorkerRebalanceDrainTest: set up a total of {} function workers, of type {}", (Object)this.pulsarCluster.getAlWorkers().size(), (Object)this.functionRuntimeType);
    }

    @Test(groups={"java_function", "rebalance_drain", "rebalance"})
    public void testRebalanceWorkers() throws Exception {
        this.testRebalance();
        log.info("Done with testRebalance");
    }

    @Test(groups={"java_function", "rebalance_drain", "drain"})
    public void testDrainWorkers() throws Exception {
        this.testDrain();
        log.info("Done with testDrain");
    }

    private List<WorkerInfo> workerInfoDecode(String json) throws IOException {
        try (MappingIterator it = ObjectMapperFactory.getThreadLocal().readerFor(WorkerInfo.class).readValues(json);){
            List list = it.readAll();
            return list;
        }
    }

    private List<Map<String, Collection<String>>> functionAssignmentsDecode(String json) throws IOException {
        boolean nextWorkerStart = false;
        String remainingJson = json;
        boolean moreToParse = true;
        ArrayList<Map<String, Collection<String>>> retVal = new ArrayList<Map<String, Collection<String>>>();
        while (moreToParse) {
            int nextFunctionListStart = remainingJson.indexOf("[");
            int nextFunctionListEnd = remainingJson.indexOf("]");
            String nextWorker = remainingJson.substring(0, nextFunctionListStart);
            nextWorker = nextWorker.replaceAll("\\s+", "");
            String nextFunctionList = remainingJson.substring(nextFunctionListStart + 1, nextFunctionListEnd);
            String[] funcAssignments = nextFunctionList.split(",");
            HashMap<String, List<String>> curMap = new HashMap<String, List<String>>();
            curMap.put(nextWorker, Arrays.asList(funcAssignments));
            log.info("Found new entry: {}, {}", (Object)nextWorker, (Object)funcAssignments);
            retVal.add(curMap);
            log.info("retVal is {}", retVal);
            try {
                remainingJson = remainingJson.substring(nextFunctionListEnd + 1);
            }
            catch (Throwable t) {
                if (log.isDebugEnabled()) {
                    log.debug("Got exception {} while moving past function-list-end", (Object)t.getMessage());
                }
                moreToParse = false;
            }
            if (remainingJson.indexOf("[") >= 0) continue;
            moreToParse = false;
        }
        return retVal;
    }

    private List<WorkerInfo> getClusterStatus() throws Exception {
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd("/pulsar/bin/pulsar-admin", "functions-worker", "get-cluster");
        log.debug("getClusterStatus result is: {}", (Object)result);
        return this.workerInfoDecode(result.getStdout());
    }

    private WorkerInfo getClusterLeader() throws Exception {
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd("/pulsar/bin/pulsar-admin", "functions-worker", "get-cluster-leader");
        List<WorkerInfo> winfos = this.workerInfoDecode(result.getStdout());
        Assert.assertEquals((int)winfos.size(), (int)1);
        return this.workerInfoDecode(result.getStdout()).get(0);
    }

    private List<Map<String, Collection<String>>> getFunctionAssignments() throws Exception {
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd("/pulsar/bin/pulsar-admin", "functions-worker", "get-function-assignments");
        log.debug("getFunctionAssignments result is: {}", (Object)result);
        return this.functionAssignmentsDecode(result.getStdout());
    }

    private int getFuncAssignmentsCount(List<Map<String, Collection<String>>> finfos) {
        int funcCount = 0;
        for (Map<String, Collection<String>> l : finfos) {
            for (Map.Entry<String, Collection<String>> m : l.entrySet()) {
                if (log.isDebugEnabled()) {
                    log.debug("accumulating for key={}, value={} (size {})", new Object[]{m.getKey(), m.getValue(), m.getValue().size()});
                }
                funcCount += m.getValue().size();
            }
        }
        return funcCount;
    }

    private int getMinFuncAssignmentOnAnyWorker(List<Map<String, Collection<String>>> finfos) {
        int minFuncCount = Integer.MAX_VALUE;
        for (Map<String, Collection<String>> l : finfos) {
            for (Map.Entry<String, Collection<String>> m : l.entrySet()) {
                if (log.isDebugEnabled()) {
                    log.debug("comparing current_min={} with key={}, value={} (size {})", new Object[]{minFuncCount, m.getKey(), m.getValue(), m.getValue().size()});
                }
                minFuncCount = Math.min(minFuncCount, m.getValue().size());
            }
        }
        return minFuncCount;
    }

    private void callRebalance() throws Exception {
        WorkerInfo leader = this.getClusterLeader();
        WorkerContainer worker = this.pulsarCluster.getWorker(leader.getWorkerId());
        Assert.assertTrue((worker != null ? 1 : 0) != 0);
        String rebalanceUrl = "http://localhost:" + leader.getPort() + "/admin/v2/worker/rebalance";
        ContainerExecResult result = worker.execCmd("/usr/bin/curl", "-X", "PUT", rebalanceUrl);
        if (log.isDebugEnabled()) {
            log.debug("callRebalance: leader's rebalance url is: {}", (Object)rebalanceUrl);
            log.debug("callRebalance: curl for rebalance: result is {}", (Object)result);
        }
    }

    private void callDrain(String workerToDrain) throws Exception {
        WorkerInfo leader = this.getClusterLeader();
        WorkerContainer worker = this.pulsarCluster.getWorker(leader.getWorkerId());
        Assert.assertTrue((worker != null ? 1 : 0) != 0);
        String drainUrl = "http://localhost:" + leader.getPort() + "/admin/v2/worker/leader/drain?workerId=" + workerToDrain;
        ContainerExecResult result = worker.execCmd("/usr/bin/curl", "-X", "PUT", drainUrl);
        if (log.isDebugEnabled()) {
            log.debug("callDrain: leader's drain url is: {}", (Object)drainUrl);
            log.debug("callDrain: curl for drain: result is {}", (Object)result);
        }
    }

    private void createFunctionWorker(String functionName, String topicPrefix) throws Exception {
        String suffix = functionName + PulsarWorkerRebalanceDrainTest.randomName(8);
        String inputTopicName = "persistent://public/default/" + topicPrefix + "-input-" + suffix;
        String outputTopicName = topicPrefix + "-output-" + suffix;
        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();){
            admin.topics().createNonPartitionedTopic(inputTopicName);
            admin.topics().createNonPartitionedTopic(outputTopicName);
        }
        this.submitFunction(CommandGenerator.Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, "org.apache.pulsar.functions.api.examples.CustomBaseToBaseFunction", "org.apache.pulsar.functions.api.examples.CustomBaseSerde", Collections.singletonMap(topicPrefix, outputTopicName));
    }

    private void showWorkerStatus(String callerContext) throws Exception {
        List<WorkerInfo> winfos = this.getClusterStatus();
        Assert.assertEquals((int)winfos.size(), (int)this.pulsarCluster.getAlWorkers().size());
        log.info("{} get-cluster retrieved info about {} workers", (Object)callerContext, (Object)winfos.size());
        winfos.forEach(w -> log.info("{} get-cluster worker-info: {}", (Object)callerContext, w));
        WorkerInfo leaderInfo = this.getClusterLeader();
        log.info("{} get-cluster-leader info: {}", (Object)callerContext, (Object)leaderInfo);
        List<Map<String, Collection<String>>> finfos = this.getFunctionAssignments();
        log.info("{} get-function-assignments retrieved info about {} workers with {} functions", new Object[]{callerContext, finfos.size(), this.getFuncAssignmentsCount(finfos)});
        finfos.forEach(f -> log.info("{} get-function-assignments info: {}", (Object)callerContext, f));
    }

    private void allocateFunctions(String callingTest, String topicPrefix) throws Exception {
        int numFunctions = this.pulsarCluster.getAlWorkers().size() * 2;
        for (int ix = 0; ix < numFunctions; ++ix) {
            String functionName = callingTest + "-" + PulsarWorkerRebalanceDrainTest.randomName(8);
            this.createFunctionWorker(functionName, topicPrefix);
            ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd("/pulsar/bin/pulsar-admin", "functions", "status", "--tenant", "public", "--namespace", "default", "--name", functionName);
            FunctionStatus functionStatus = FunctionStatusUtil.decode((String)result.getStdout());
            log.debug("{}}: functionStatus is {}", (Object)callingTest, (Object)functionStatus);
            Assert.assertEquals((int)functionStatus.getNumInstances(), (int)1);
            Assert.assertEquals((boolean)((FunctionStatus.FunctionInstanceStatus)functionStatus.getInstances().get(0)).getStatus().isRunning(), (boolean)true);
        }
    }

    private void testRebalance() throws Exception {
        this.allocateFunctions("testRebalanceAddWorkers", "test-rebalance");
        if (log.isDebugEnabled()) {
            this.showWorkerStatus("testRebalanceAddWorkers after allocating functions");
        }
        WorkerInfo oldClusterLeaderInfo = this.getClusterLeader();
        log.info("Cluster leader before adding more workers is: {}", (Object)oldClusterLeaderInfo);
        List<Map<String, Collection<String>>> startFinfos = this.getFunctionAssignments();
        int startFuncCount = this.getFuncAssignmentsCount(startFinfos);
        log.info("testRebalanceAddWorkers: got info about {} workers with {} functions before creating new workers", (Object)startFinfos.size(), (Object)startFuncCount);
        Assert.assertEquals((int)this.getMinFuncAssignmentOnAnyWorker(startFinfos), (int)2);
        int initialNumWorkers = this.pulsarCluster.getAlWorkers().size();
        int numWorkersToAdd = 2;
        log.info("testRebalanceAddWorkers: cluster has {} FunctionWorkers; going to set up {} more", (Object)this.pulsarCluster.getAlWorkers().size(), (Object)2);
        this.pulsarCluster.setupFunctionWorkers(PulsarWorkerRebalanceDrainTest.randomName(5), this.functionRuntimeType, 2);
        Assert.assertEquals((int)this.pulsarCluster.getAlWorkers().size(), (int)(initialNumWorkers + 2));
        log.info("testRebalanceAddWorkers: got a total of {} function workers, of type {}", (Object)this.pulsarCluster.getAlWorkers().size(), (Object)this.functionRuntimeType);
        this.showWorkerStatus("testRebalanceAddWorkers status after adding more workers");
        WorkerInfo newClusterLeaderInfo = this.getClusterLeader();
        log.info("Cluster leader after adding {} workers is: {}", (Object)2, (Object)newClusterLeaderInfo);
        Assert.assertTrue((oldClusterLeaderInfo.getWorkerId().compareTo(newClusterLeaderInfo.getWorkerId()) == 0 ? 1 : 0) != 0);
        this.showWorkerStatus("testRebalanceAddWorkers after adding more workers");
        this.callRebalance();
        this.showWorkerStatus("testRebalanceAddWorkers after rebalance");
        List<Map<String, Collection<String>>> endFinfos = this.getFunctionAssignments();
        int endFuncCount = this.getFuncAssignmentsCount(endFinfos);
        log.info("testRebalanceAddWorkers: got info about {} workers with {} functions after rebalance", (Object)endFinfos.size(), (Object)endFuncCount);
        Assert.assertEquals((int)(endFinfos.size() - startFinfos.size()), (int)2);
        Assert.assertEquals((int)startFuncCount, (int)endFuncCount);
        int minFuncsPerWorker = (int)Math.floor(endFuncCount / endFinfos.size());
        Assert.assertEquals((int)this.getMinFuncAssignmentOnAnyWorker(endFinfos), (int)minFuncsPerWorker);
        Assert.assertTrue((minFuncsPerWorker < 2 ? 1 : 0) != 0);
    }

    private void testDrain() throws Exception {
        this.allocateFunctions("testDrain", "test-drain");
        List<Map<String, Collection<String>>> startFinfos = this.getFunctionAssignments();
        int startFuncCount = this.getFuncAssignmentsCount(startFinfos);
        log.info("testDrain: got info about {} workers with {} functions before drain", (Object)startFinfos.size(), (Object)startFuncCount);
        if (log.isDebugEnabled()) {
            this.showWorkerStatus("testDrain after allocating functions");
        }
        WorkerInfo clusterLeaderInfo = this.getClusterLeader();
        this.callDrain(clusterLeaderInfo.getWorkerId());
        if (log.isDebugEnabled()) {
            this.showWorkerStatus("testDrain after drain");
        }
        List<Map<String, Collection<String>>> endFinfos = this.getFunctionAssignments();
        int endFuncCount = this.getFuncAssignmentsCount(endFinfos);
        log.info("testDrain: got info about {} workers with {} functions after drain", (Object)endFinfos.size(), (Object)endFuncCount);
        Assert.assertTrue((startFinfos.size() > endFinfos.size() ? 1 : 0) != 0);
        Assert.assertEquals((int)startFuncCount, (int)endFuncCount);
        int minFuncsPerWorker = (int)Math.floor(endFuncCount / endFinfos.size());
        Assert.assertEquals((int)this.getMinFuncAssignmentOnAnyWorker(endFinfos), (int)minFuncsPerWorker);
    }
}

