package com.pinterest.doctorkafka.util;

import ch.qos.logback.core.CoreConstants;
import ch.qos.logback.core.spi.AbstractComponentTracker;
import com.google.common.net.HostAndPort;
import com.pinterest.doctorkafka.BrokerStats;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.management.MBeanServerConnection;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import kafka.api.FetchRequestBuilder;
import kafka.api.FetchResponse;
import kafka.api.OffsetRequest;
import kafka.api.Request;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.SimpleConsumer;
import kafka.message.ByteBufferMessageSet;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import scala.Tuple2;
import scala.collection.Seq;
import scala.tools.fusesource_embedded.jansi.AnsiRenderer;

/* loaded from: input_file:com/pinterest/doctorkafka/util/OperatorUtil.class */
public class OperatorUtil {
    public static final int WINDOW_SIZE = 6000;
    private static final String FETCH_CLIENT_NAME = "DoctorKafka";
    private static final int FETCH_SOCKET_TIMEOUT = 10000;
    private static final int FETCH_BUFFER_SIZE = 4194304;
    private static final int FETCH_RETRIES = 3;
    private static final int FETCH_MAX_WAIT_MS = 1;
    private static final Logger LOG = LogManager.getLogger((Class<?>) OperatorUtil.class);
    public static final String HostName = getHostname();
    private static final DecoderFactory avroDecoderFactory = DecoderFactory.get();
    private static Map<String, KafkaConsumer> kafkaConsumers = new HashMap();
    private static Map<String, ZkUtils> zkUtilsMap = new HashMap();

    public static String getHostname() {
        String str;
        try {
            str = InetAddress.getLocalHost().getHostName();
            int indexOf = str.indexOf(46);
            if (indexOf > 0) {
                str = str.substring(0, indexOf);
            }
        } catch (Exception e) {
            str = System.getenv(CoreConstants.HOSTNAME_KEY);
        }
        return str;
    }

    public static MBeanServerConnection getMBeanServerConnection(String str, String str2) {
        MBeanServerConnection mBeanServerConnection = null;
        try {
            mBeanServerConnection = JMXConnectorFactory.connect(new JMXServiceURL("service:jmx:rmi://" + str + "/jndi/rmi://" + str + ParameterizedMessage.ERROR_MSG_SEPARATOR + str2 + "/jmxrmi"), new HashMap()).getMBeanServerConnection();
        } catch (Exception e) {
            LOG.error("Failed to connect to MBeanServer {}:{}", HostName, str2, e);
        }
        return mBeanServerConnection;
    }

    public static boolean pingKafkaBroker(String str, int i, int i2) {
        try {
            Socket socket = new Socket();
            Throwable th = null;
            try {
                try {
                    socket.connect(new InetSocketAddress(str, i), i2);
                    if (socket != null) {
                        if (0 != 0) {
                            try {
                                socket.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            socket.close();
                        }
                    }
                    return true;
                } finally {
                }
            } finally {
            }
        } catch (UnknownHostException e) {
            LOG.warn("Ping failure, host: " + str, (Throwable) e);
            return false;
        } catch (IOException e2) {
            LOG.warn("Ping failure IO, host: " + str, (Throwable) e2);
            return false;
        }
    }

    public static boolean canFetchData(String str, int i, String str2, int i2) {
        LOG.info("Fetching data from host {}, topic {}, partition {}", str, str2, Integer.valueOf(i2));
        SimpleConsumer simpleConsumer = new SimpleConsumer(str, i, 10000, ConsumerConfig.SocketBufferSize(), FETCH_CLIENT_NAME);
        try {
            try {
                long offset = getOffset(simpleConsumer, str2, i2, OffsetRequest.EarliestTime());
                long offset2 = getOffset(simpleConsumer, str2, i2, OffsetRequest.LatestTime());
                long j = (offset + offset2) / 2;
                LOG.info("earlyOffset: " + offset + " latestOffset: " + offset2 + " readOffset: " + j);
                for (int i3 = 0; i3 < 3; i3++) {
                    try {
                        FetchResponse fetch = simpleConsumer.fetch(new FetchRequestBuilder().clientId(FETCH_CLIENT_NAME).replicaId(Request.DebuggingConsumerId()).maxWait(1).minBytes(ConsumerConfig.MinFetchBytes()).addFetch(str2, i2, j, FETCH_BUFFER_SIZE).build());
                        if (fetch.hasError()) {
                            LOG.warn("Error fetching Data. ErrorCode: " + fetch.error(str2, i2));
                        } else {
                            ByteBufferMessageSet messageSet = fetch.messageSet(str2, i2);
                            if (messageSet.sizeInBytes() > 0) {
                                LOG.info("Passed. Fetching data from host {}, topic {}, partition {}", str, str2, Integer.valueOf(i2));
                                simpleConsumer.close();
                                return true;
                            }
                            if (offset == offset2) {
                                LOG.warn("Passed. No data in partition.  Fetching data from host {}, topic {}, partition {}", str, str2, Integer.valueOf(i2));
                                simpleConsumer.close();
                                return true;
                            }
                            LOG.warn("host: " + str + " topic: " + str2 + " par: " + i2 + " Not enough bytes: {}", Integer.valueOf(messageSet.sizeInBytes()));
                        }
                    } catch (Exception e) {
                        LOG.warn("For host: " + str + " Unexpected exception", (Throwable) e);
                    }
                    try {
                        Thread.sleep((long) (Math.random() * 3000.0d));
                    } catch (InterruptedException e2) {
                        LOG.warn("Unexpected interruption", (Throwable) e2);
                    }
                }
                simpleConsumer.close();
            } catch (IOException e3) {
                LOG.warn("For host: " + str + " Unexpected exception", (Throwable) e3);
                simpleConsumer.close();
            }
            LOG.warn("Failed Fetching data from host {}, topic {}, parttion {}", str, str2, Integer.valueOf(i2));
            return false;
        } catch (Throwable th) {
            simpleConsumer.close();
            throw th;
        }
    }

    public static long getOffset(SimpleConsumer simpleConsumer, String str, int i, long j) throws IOException {
        String str2 = null;
        RuntimeException runtimeException = null;
        for (int i2 = 0; i2 < 3; i2++) {
            try {
                return simpleConsumer.earliestOrLatestOffset(new TopicAndPartition(str, i), j, Request.DebuggingConsumerId());
            } catch (RuntimeException e) {
                runtimeException = e;
                str2 = "Failed to getting offset for topic: " + str + " partition: " + i + " host: " + simpleConsumer.host();
                LOG.warn(str2, (Throwable) e);
                try {
                    Thread.sleep((long) (Math.random() * 3000.0d));
                } catch (InterruptedException e2) {
                    LOG.warn("Unexpected interruption", (Throwable) e2);
                }
            }
        }
        throw new IOException(str2, runtimeException);
    }

    public static ZkUtils getZkUtils(String str) {
        if (!zkUtilsMap.containsKey(str)) {
            Tuple2<ZkClient, ZkConnection> createZkClientAndConnection = ZkUtils.createZkClientAndConnection(str, 30000, 3000000);
            zkUtilsMap.put(str, new ZkUtils(createZkClientAndConnection.mo5183_1(), createZkClientAndConnection.mo5182_2(), true));
        }
        return zkUtilsMap.get(str);
    }

    public static String getBrokers(String str, SecurityProtocol securityProtocol) {
        Seq<Broker> allBrokersInCluster = getZkUtils(str).getAllBrokersInCluster();
        Broker[] brokerArr = new Broker[allBrokersInCluster.size()];
        allBrokersInCluster.copyToArray(brokerArr);
        return (String) Arrays.stream(brokerArr).map(broker -> {
            return broker.brokerEndPoint(ListenerName.forSecurityProtocol(securityProtocol)).connectionString();
        }).reduce(null, (str2, str3) -> {
            return str2 == null ? str3 : str2 + AnsiRenderer.CODE_LIST_SEPARATOR + str3;
        });
    }

    public static Properties createKafkaProducerProperties(String str, SecurityProtocol securityProtocol) {
        String brokers = getBrokers(str, securityProtocol);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokers);
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
        properties.put("retries", 0);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1638400);
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 3554432);
        properties.put("compression.type", "gzip");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        return properties;
    }

    public static Properties createKafkaConsumerProperties(String str, String str2, SecurityProtocol securityProtocol, Map<String, String> map) {
        String brokers = getBrokers(str, securityProtocol);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokers);
        properties.put("group.id", str2);
        properties.put("enable.auto.commit", ZooKeeperSaslClient.ENABLE_CLIENT_SASL_DEFAULT);
        properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        if (map != null) {
            for (Map.Entry<String, String> entry : map.entrySet()) {
                properties.put(entry.getKey(), entry.getValue());
            }
        }
        return properties;
    }

    public static BrokerStats deserializeBrokerStats(ConsumerRecord<byte[], byte[]> consumerRecord) {
        try {
            BinaryDecoder binaryDecoder = avroDecoderFactory.binaryDecoder(consumerRecord.value(), (BinaryDecoder) null);
            SpecificDatumReader specificDatumReader = new SpecificDatumReader(BrokerStats.getClassSchema());
            BrokerStats brokerStats = new BrokerStats();
            specificDatumReader.read(brokerStats, binaryDecoder);
            return brokerStats;
        } catch (Exception e) {
            LOG.debug("Fail to decode an message", (Throwable) e);
            return null;
        }
    }

    public static void startOstrichService(String str, int i) {
        new OstrichAdminService(i).startAdminHttpService();
        if (str != null) {
            LOG.info("Starting the OpenTsdb metrics pusher");
            try {
                HostAndPort fromString = HostAndPort.fromString(str);
                new MetricsPusher(fromString.getHostText(), fromString.getPort(), new OpenTsdbMetricConverter("KafkaOperator", HostName), AbstractComponentTracker.LINGERING_TIMEOUT).start();
                LOG.info("OpenTsdb metrics pusher started!");
            } catch (Throwable th) {
                LOG.error("Exception when starting stats pusher: ", th);
            }
        }
    }

    public static MutablePair<Long, Long> getProcNetDevStats() throws Exception {
        Process start = new ProcessBuilder("cat", "/proc/net/dev").start();
        start.waitFor();
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(start.getInputStream()));
        int i = 0;
        long j = 0;
        long j2 = 0;
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                bufferedReader.close();
                return new MutablePair<>(Long.valueOf(j), Long.valueOf(j2));
            }
            System.out.println(i + ": " + readLine);
            if (readLine.contains("eth0")) {
                String[] split = readLine.split(" ");
                j = Long.parseLong(split[3]);
                j2 = Long.parseLong(split[41]);
                System.out.println(" inBytes = " + j + "  outBytes = " + j2);
            }
            i++;
        }
    }

    public static MutablePair<Double, Double> getSysNetworkTraffic(long j) throws Exception {
        MutablePair<Long, Long> procNetDevStats = getProcNetDevStats();
        Thread.sleep(j);
        MutablePair<Long, Long> procNetDevStats2 = getProcNetDevStats();
        return new MutablePair<>(Double.valueOf(((procNetDevStats2.getKey().longValue() - procNetDevStats.getKey().longValue()) * 1000.0d) / j), Double.valueOf(((procNetDevStats2.getValue().longValue() - procNetDevStats.getValue().longValue()) * 1000.0d) / j));
    }

    public static <K, V extends Comparable<? super V>> Map<K, V> sortByValue(Map<K, V> map) {
        return (Map) map.entrySet().stream().sorted(Map.Entry.comparingByValue(Collections.reverseOrder())).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }, (comparable, comparable2) -> {
            return comparable;
        }, LinkedHashMap::new));
    }
}
