package org.apache.kafka.trogdor.fault;

import com.fasterxml.jackson.databind.node.TextNode;
import io.kubernetes.client.Exec;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.util.Config;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/trogdor/fault/CloudAZNetworkFaultWorker.class */
public class CloudAZNetworkFaultWorker implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger(CloudAZNetworkFaultWorker.class);
    private final String id;
    private CoreV1Api k8sCoreApi;
    private final String azName;
    private final Set<V1Pod> podFaultTracker = new HashSet();

    public CloudAZNetworkFaultWorker(String str, String str2) {
        this.id = str;
        this.azName = str2;
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void start(Platform platform, WorkerStatusTracker workerStatusTracker, KafkaFutureImpl<String> kafkaFutureImpl) throws Exception {
        Configuration.setDefaultApiClient(Config.defaultClient());
        this.k8sCoreApi = new CoreV1Api();
        Set<V1Pod> set = (Set) getAllKafkaClusterPods().stream().filter(v1Pod -> {
            String nodeName = v1Pod.getSpec().getNodeName();
            try {
                return ((String) this.k8sCoreApi.readNode(nodeName, "false").getMetadata().getLabels().get("topology.kubernetes.io/zone")).equals(this.azName);
            } catch (ApiException e) {
                log.error("Failed to get node. Node name {}", nodeName, e);
                return false;
            }
        }).collect(Collectors.toSet());
        log.info("Activating CloudAZNetworkFaultWorker {}. Pods {}", this.id, getPodNames(set));
        if (set.isEmpty()) {
            return;
        }
        workerStatusTracker.update(new TextNode("enabling traffic dropping for " + this.id));
        injectDataLossFault(set);
        workerStatusTracker.update(new TextNode("traffic dropping enabled " + this.id));
    }

    private String getPodNames(Set<V1Pod> set) {
        return (String) set.stream().map(v1Pod -> {
            return String.join(":", v1Pod.getMetadata().getNamespace(), v1Pod.getMetadata().getName());
        }).collect(Collectors.joining(","));
    }

    private void injectDataLossFault(Set<V1Pod> set) {
        String[] strArr = {"apt-get", "update"};
        String[] strArr2 = {"apt-get", "install", "iproute2", "--yes"};
        String[] strArr3 = {"tc", "qdisc", "add", "dev", "eth0", "root", "netem", "loss", "100%"};
        set.forEach(v1Pod -> {
            this.podFaultTracker.add(v1Pod);
            try {
                log.info("Injecting network AZ isolation fault on Kafka pod {}", v1Pod.getMetadata().getName());
                log.info("Injecting network AZ isolation fault on Kafka pod {}, tty {}", v1Pod.getMetadata().getName(), Boolean.valueOf(System.console() != null));
                executeCommand(v1Pod, strArr);
                executeCommand(v1Pod, strArr2);
                executeCommand(v1Pod, strArr3);
                log.info("DONE:Injecting network AZ isolation fault on Kafka pod {}", v1Pod.getMetadata().getName());
            } catch (ApiException | IOException e) {
                log.error("Error injecting AZ network isolation fault on host {}, Kakfa pod {}", v1Pod.getSpec().getHostname(), v1Pod.getMetadata().getName());
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                log.error("Interrupted", e2);
            }
        });
    }

    private void executeCommand(V1Pod v1Pod, String[] strArr) throws IOException, ApiException, InterruptedException {
        Process exec = new Exec().exec(v1Pod, strArr, false, false);
        boolean waitFor = exec.waitFor(1L, TimeUnit.MINUTES);
        exec.destroy();
        log.info("Done:" + String.join(" ", strArr) + ": Injecting network AZ isolation fault on Kafka pod {}. Result {}, Process Completion Status: {}", new Object[]{v1Pod.getMetadata().getName(), Integer.valueOf(exec.exitValue()), Boolean.valueOf(waitFor)});
    }

    private Set<V1Pod> getAllKafkaClusterPods() throws ApiException {
        return (Set) this.k8sCoreApi.listPodForAllNamespaces((Boolean) null, (String) null, (String) null, "type=kafka", (Integer) null, (String) null, (String) null, (String) null, (Integer) null, (Boolean) null).getItems().stream().filter(v1Pod -> {
            return v1Pod.getMetadata().getName().matches("kafka-\\d+") && v1Pod.getMetadata().getNamespace().matches("pkc-[a-zA-Z0-9]+") && !v1Pod.getMetadata().getNamespace().endsWith("mothership");
        }).collect(Collectors.toSet());
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void stop(Platform platform) throws Exception {
        log.info("Deactivating CloudNetworkAZFaultWorker {}. Pods {}", this.id, getPodNames(this.podFaultTracker));
        String[] strArr = {"tc", "qdisc", "del", "dev", "eth0", "root"};
        for (V1Pod v1Pod : this.podFaultTracker) {
            try {
                executeCommand(v1Pod, strArr);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("Interrupted", e);
            } catch (ApiException | IOException e2) {
                log.error("Error rolling back AZ network isolation fault on host {}, Kakfa pod {}", v1Pod.getSpec().getHostname(), v1Pod.getMetadata().getName());
            }
        }
    }
}
