package org.apache.kafka.trogdor.fault;

import com.fasterxml.jackson.databind.node.TextNode;
import io.kubernetes.client.Exec;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.util.Config;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.CloudNetworkFaultSpec;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/trogdor/fault/CloudNetworkFaultWorker.class */
public class CloudNetworkFaultWorker implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger(CloudNetworkFaultWorker.class);
    private static CoreV1Api k8sCoreApi;
    private final String id;
    private final Map<String, CloudNetworkFaultSpec.NetworkSpec> nodeSpecs;
    private WorkerStatusTracker status;
    private List<K8sLossFault> faults;
    private ApiClient client;
    private Exec exec;
    private static Map<String, String> kafkaPods;

    /* loaded from: input_file:org/apache/kafka/trogdor/fault/CloudNetworkFaultWorker$K8sLossFault.class */
    public static class K8sLossFault implements Comparable<K8sLossFault> {
        private String srcNamespace;
        private String srcPodName;
        private String dstNamespace;
        private String dstPodName;
        private String lossPercentage;
        private String networkDevice;

        K8sLossFault(String str, String str2, String str3, String str4, String str5, String str6) throws ApiException {
            this.srcPodName = str2;
            this.srcNamespace = str.isEmpty() ? (String) CloudNetworkFaultWorker.kafkaPods.get(str2) : str;
            this.dstPodName = str4;
            this.dstNamespace = str3.isEmpty() ? (String) CloudNetworkFaultWorker.kafkaPods.get(str4) : str3;
            this.lossPercentage = str5;
            this.networkDevice = str6;
        }

        public boolean toAll() {
            return this.dstNamespace.equals("all");
        }

        @Override // java.lang.Comparable
        public int compareTo(K8sLossFault k8sLossFault) {
            String str = this.srcNamespace + this.srcPodName;
            String str2 = this.dstNamespace + this.dstPodName;
            String str3 = k8sLossFault.srcNamespace + k8sLossFault.srcPodName;
            String str4 = k8sLossFault.dstNamespace + k8sLossFault.dstPodName;
            int compareTo = str.compareTo(str3);
            return compareTo < 0 ? compareTo : str2.compareTo(str4);
        }
    }

    public CloudNetworkFaultWorker(String str, Map<String, CloudNetworkFaultSpec.NetworkSpec> map) {
        this.id = str;
        this.nodeSpecs = map;
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void start(Platform platform, WorkerStatusTracker workerStatusTracker, KafkaFutureImpl<String> kafkaFutureImpl) throws IOException, ApiException {
        log.info("Activating CloudNetworkFaultWorker {}.", this.id);
        this.status = workerStatusTracker;
        this.status.update(new TextNode("enabling traffic control " + this.id));
        CloudNetworkFaultSpec.NetworkSpec networkSpec = this.nodeSpecs.get(platform.curNode().name());
        if (networkSpec != null) {
            this.client = Config.defaultClient();
            Configuration.setDefaultApiClient(this.client);
            k8sCoreApi = new CoreV1Api();
            kafkaPods = getKafkaPods();
            this.faults = createListOfFaultObjects(networkSpec);
            executeOne(this.faults);
            this.status.update(new TextNode("traffic control enabled " + this.id));
        }
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void stop(Platform platform) throws IOException, ApiException {
        this.status.update(new TextNode("disabling traffic control " + this.id));
        log.info("Deactivating CloudNetworkFaultWorker {}.", this.id);
        if (this.nodeSpecs.get(platform.curNode().name()) != null) {
            for (int i = 0; i < this.faults.size(); i++) {
                K8sLossFault k8sLossFault = this.faults.get(i);
                this.exec.exec(k8sLossFault.srcNamespace, k8sLossFault.srcPodName, new String[]{"tc", "qdisc", "del", "dev", k8sLossFault.networkDevice, "root"}, false, false);
            }
        }
        this.status.update(new TextNode("disabled traffic control " + this.id));
    }

    public List<K8sLossFault> createListOfFaultObjects(CloudNetworkFaultSpec.NetworkSpec networkSpec) throws ApiException {
        log.info("creating fault objects " + this.id);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new K8sLossFault(networkSpec.srcNamespace(), networkSpec.srcPodName(), networkSpec.dstNamespace(), networkSpec.dstPodName(), Integer.toString(networkSpec.lossPercentage()), networkSpec.networkDevice()));
        log.trace("successfully created fault objects " + this.id);
        return arrayList;
    }

    public static Map<String, String> getKafkaPods() throws ApiException {
        return (Map) k8sCoreApi.listPodForAllNamespaces((Boolean) null, (String) null, (String) null, (String) null, (Integer) null, (String) null, (String) null, (String) null, (Integer) null, (Boolean) null).getItems().stream().filter(v1Pod -> {
            return v1Pod.getMetadata().getName().matches("kafka-\\d+") && v1Pod.getMetadata().getNamespace().matches("pkc-[a-zA-Z0-9]+");
        }).collect(Collectors.toMap(v1Pod2 -> {
            return v1Pod2.getMetadata().getName();
        }, v1Pod3 -> {
            return v1Pod3.getMetadata().getNamespace();
        }));
    }

    public String getNamespacedPodIp(String str, String str2) throws ApiException {
        return ((V1Pod) k8sCoreApi.listNamespacedPod(str, (String) null, (Boolean) null, (String) null, "metadata.name=" + str2, (String) null, (Integer) null, (String) null, (String) null, (Integer) null, (Boolean) null).getItems().get(0)).getStatus().getPodIP();
    }

    public void executeOne(List<K8sLossFault> list) throws IOException, ApiException {
        this.exec = new Exec();
        K8sLossFault k8sLossFault = list.get(0);
        String str = k8sLossFault.srcNamespace;
        String str2 = k8sLossFault.srcPodName;
        String str3 = k8sLossFault.dstNamespace;
        String str4 = k8sLossFault.dstPodName;
        if (k8sLossFault.toAll()) {
            this.exec.exec(str, str2, new String[]{"tc", "qdisc", "add", "dev", k8sLossFault.networkDevice, "root", "netem", "loss", k8sLossFault.lossPercentage + "%"}, false, false);
            log.info("Injected loss to all outgoing connections " + this.id);
            return;
        }
        this.exec.exec(str, str2, new String[]{"tc", "qdisc", "add", "dev", "eth0", "root", "handle", "1:", "prio"}, false, false);
        String namespacedPodIp = getNamespacedPodIp(str3, str4);
        log.info("successfully got pod IP of " + this.id);
        this.exec.exec(str, str2, new String[]{"tc", "qdisc", "add", "dev", k8sLossFault.networkDevice, "parent", "1:1", "handle", "10:", "netem", "loss", k8sLossFault.lossPercentage + "%"}, false, false);
        log.info("Created root of tc tree " + this.id);
        this.exec.exec(str, str2, new String[]{"tc", "filter", "add", "dev", k8sLossFault.networkDevice, "protocol", "ip", "parent", "1:0", "prio", "1", "u32", "match", "ip", "dst", namespacedPodIp + "/32", "flowid", "1:1"}, false, false);
        log.info("Injected loss to outgoing connection to one destination " + this.id);
    }
}
