/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.smoketest;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.streams.smoketest.SmokeTestClient;
import org.apache.kafka.streams.smoketest.SmokeTestUtil;

public class SmokeTestDriver
extends SmokeTestUtil {
    public static void main(String[] args) throws Exception {
        String kafka = "localhost:9092";
        String zookeeper = "localhost:2181";
        File stateDir = SmokeTestDriver.createDir("/tmp/kafka-streams-smoketest");
        int numKeys = 10;
        int maxRecordsPerKey = 500;
        Thread driver = new Thread(){

            @Override
            public void run() {
                try {
                    Map<String, Set<Integer>> allData = SmokeTestDriver.generate("localhost:9092", 10, 500);
                    SmokeTestDriver.verify("localhost:9092", allData, 500);
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        };
        SmokeTestClient streams1 = new SmokeTestClient(SmokeTestDriver.createDir(stateDir, "1"), "localhost:9092", "localhost:2181");
        SmokeTestClient streams2 = new SmokeTestClient(SmokeTestDriver.createDir(stateDir, "2"), "localhost:9092", "localhost:2181");
        SmokeTestClient streams3 = new SmokeTestClient(SmokeTestDriver.createDir(stateDir, "3"), "localhost:9092", "localhost:2181");
        SmokeTestClient streams4 = new SmokeTestClient(SmokeTestDriver.createDir(stateDir, "4"), "localhost:9092", "localhost:2181");
        System.out.println("starting the driver");
        driver.start();
        System.out.println("starting the first and second client");
        streams1.start();
        streams2.start();
        SmokeTestDriver.sleep(10000L);
        System.out.println("starting the third client");
        streams3.start();
        System.out.println("closing the first client");
        streams1.close();
        System.out.println("closed the first client");
        SmokeTestDriver.sleep(10000L);
        System.out.println("starting the forth client");
        streams4.start();
        driver.join();
        System.out.println("driver stopped");
        streams2.close();
        streams3.close();
        streams4.close();
        System.out.println("shutdown");
    }

    public static Map<String, Set<Integer>> generate(String kafka, int numKeys, int maxRecordsPerKey) throws Exception {
        Properties props = new Properties();
        props.put("client.id", "SmokeTest");
        props.put("bootstrap.servers", kafka);
        props.put("key.serializer", ByteArraySerializer.class);
        props.put("value.serializer", ByteArraySerializer.class);
        KafkaProducer producer = new KafkaProducer(props);
        int numRecordsProduced = 0;
        HashMap allData = new HashMap();
        ValueList[] data = new ValueList[numKeys];
        for (int i = 0; i < numKeys; ++i) {
            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
            allData.put(data[i].key, new HashSet());
        }
        Random rand = new Random();
        int remaining = data.length;
        while (remaining > 0) {
            int index = rand.nextInt(remaining);
            String key = data[index].key;
            int value = data[index].next();
            if (value < 0) {
                data[index] = data[--remaining];
                value = Integer.MAX_VALUE;
            }
            ProducerRecord record = new ProducerRecord("data", (Object)stringSerde.serializer().serialize("", (Object)key), (Object)intSerde.serializer().serialize("", (Object)value));
            producer.send(record);
            if (value == Integer.MAX_VALUE) continue;
            ((Set)allData.get(key)).add(value);
            if (++numRecordsProduced % 100 == 0) {
                System.out.println(numRecordsProduced + " records produced");
            }
            Thread.sleep(10L);
        }
        producer.close();
        return Collections.unmodifiableMap(allData);
    }

    private static void shuffle(int[] data, int windowSize) {
        Random rand = new Random();
        for (int i = 0; i < data.length; ++i) {
            int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
            int tmp = data[i];
            data[i] = data[j];
            data[j] = tmp;
        }
    }

    /*
     * Unable to fully structure code
     */
    public static void verify(String kafka, Map<String, Set<Integer>> allData, int maxRecordsPerKey) {
        props = new Properties();
        props.put("client.id", "verifier");
        props.put("bootstrap.servers", kafka);
        props.put("key.deserializer", ByteArrayDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        consumer = new KafkaConsumer(props);
        partitions = SmokeTestDriver.getAllPartitions(consumer, new String[]{"echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"});
        consumer.assign(partitions);
        consumer.seekToBeginning(partitions);
        recordsGenerated = allData.size() * maxRecordsPerKey;
        recordsProcessed = 0;
        max = new HashMap<String, Integer>();
        min = new HashMap<String, Integer>();
        dif = new HashMap<String, Integer>();
        sum = new HashMap<String, Long>();
        cnt = new HashMap<String, Long>();
        avg = new HashMap<String, Double>();
        wcnt = new HashMap<String, Long>();
        tagg = new HashMap<String, Long>();
        keys = new HashSet<String>();
        received = new HashMap<String, HashSet<E>>();
        for (String key : allData.keySet()) {
            keys.add(key);
            received.put(key, new HashSet<E>());
        }
        retryCount = 0;
        maxRetry = 360;
        block23: while (true) {
            if ((records = consumer.poll(500L)).isEmpty()) {
                if (++retryCount <= maxRetry) continue;
                break;
            }
            retryCount = 0;
            i$ = records.iterator();
            block24: while (true) {
                if (i$.hasNext()) ** break;
                continue block23;
                record = (ConsumerRecord)i$.next();
                key = (String)SmokeTestDriver.stringSerde.deserializer().deserialize("", (byte[])record.key());
                var24_28 = record.topic();
                var25_29 = -1;
                switch (var24_28.hashCode()) {
                    case 3107365: {
                        if (!var24_28.equals("echo")) break;
                        var25_29 = 0;
                        break;
                    }
                    case 108114: {
                        if (!var24_28.equals("min")) break;
                        var25_29 = 1;
                        break;
                    }
                    case 107876: {
                        if (!var24_28.equals("max")) break;
                        var25_29 = 2;
                        break;
                    }
                    case 99457: {
                        if (!var24_28.equals("dif")) break;
                        var25_29 = 3;
                        break;
                    }
                    case 114251: {
                        if (!var24_28.equals("sum")) break;
                        var25_29 = 4;
                        break;
                    }
                    case 98665: {
                        if (!var24_28.equals("cnt")) break;
                        var25_29 = 5;
                        break;
                    }
                    case 96978: {
                        if (!var24_28.equals("avg")) break;
                        var25_29 = 6;
                        break;
                    }
                    case 3643794: {
                        if (!var24_28.equals("wcnt")) break;
                        var25_29 = 7;
                        break;
                    }
                    case 3552269: {
                        if (!var24_28.equals("tagg")) break;
                        var25_29 = 8;
                    }
                }
                switch (var25_29) {
                    case 0: {
                        value = (Integer)SmokeTestDriver.intSerde.deserializer().deserialize("", (byte[])record.value());
                        if (value != null && value == 0x7FFFFFFF) {
                            keys.remove(key);
                            if (!keys.isEmpty()) continue block24;
                            maxRetry = 120;
                            continue block24;
                        }
                        ++recordsProcessed;
                        ((Set)received.get(key)).add(value);
                        continue block24;
                    }
                    case 1: {
                        min.put(key, (Integer)SmokeTestDriver.intSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                    case 2: {
                        max.put(key, (Integer)SmokeTestDriver.intSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                    case 3: {
                        dif.put(key, (Integer)SmokeTestDriver.intSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                    case 4: {
                        sum.put(key, (Long)SmokeTestDriver.longSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                    case 5: {
                        cnt.put(key, (Long)SmokeTestDriver.longSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                    case 6: {
                        avg.put(key, (Double)SmokeTestDriver.doubleSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                    case 7: {
                        wcnt.put(key, (Long)SmokeTestDriver.longSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                    case 8: {
                        tagg.put(key, (Long)SmokeTestDriver.longSerde.deserializer().deserialize("", (byte[])record.value()));
                        continue block24;
                    }
                }
                System.out.println("unknown topic: " + record.topic());
            }
            break;
        }
        consumer.close();
        System.out.println("-------------------");
        System.out.println("Result Verification");
        System.out.println("-------------------");
        System.out.println("recordGenerated=" + recordsGenerated);
        System.out.println("recordProcessed=" + recordsProcessed);
        if (recordsProcessed > recordsGenerated) {
            System.out.println("PROCESSED-MORE-THAN-GENERATED");
        } else if (recordsProcessed < recordsGenerated) {
            System.out.println("PROCESSED-LESS-THAN-GENERATED");
        }
        success = allData.equals(received);
        if (success) {
            System.out.println("ALL-RECORDS-DELIVERED");
        } else {
            missedCount = 0;
            for (Map.Entry<String, Set<Integer>> entry : allData.entrySet()) {
                missedCount += ((Set)received.get(entry.getKey())).size();
            }
            System.out.println("missedRecords=" + missedCount);
        }
        success &= SmokeTestDriver.verifyMin(min, allData);
        success &= SmokeTestDriver.verifyMax(max, allData);
        success &= SmokeTestDriver.verifyDif(dif, allData);
        success &= SmokeTestDriver.verifySum(sum, allData);
        success &= SmokeTestDriver.verifyCnt(cnt, allData);
        success &= SmokeTestDriver.verifyAvg(avg, allData);
        success &= SmokeTestDriver.verifyWCnt(wcnt, allData);
        System.out.println((success &= SmokeTestDriver.verifyTAgg(tagg, allData)) != false ? "SUCCESS" : "FAILURE");
    }

    private static boolean verifyMin(Map<String, Integer> map, Map<String, Set<Integer>> allData) {
        boolean success = true;
        if (map.isEmpty()) {
            System.out.println("min is empty");
            success = false;
        } else {
            System.out.println("verifying min");
            if (map.size() != allData.size()) {
                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
                success = false;
            }
            for (Map.Entry<String, Integer> entry : map.entrySet()) {
                int expected = SmokeTestDriver.getMin(entry.getKey());
                if (expected == entry.getValue()) continue;
                System.out.println("fail: key=" + entry.getKey() + " min=" + entry.getValue() + " expected=" + expected);
                success = false;
            }
        }
        return success;
    }

    private static boolean verifyMax(Map<String, Integer> map, Map<String, Set<Integer>> allData) {
        boolean success = true;
        if (map.isEmpty()) {
            System.out.println("max is empty");
            success = false;
        } else {
            System.out.println("verifying max");
            if (map.size() != allData.size()) {
                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
                success = false;
            }
            for (Map.Entry<String, Integer> entry : map.entrySet()) {
                int expected = SmokeTestDriver.getMax(entry.getKey());
                if (expected == entry.getValue()) continue;
                System.out.println("fail: key=" + entry.getKey() + " max=" + entry.getValue() + " expected=" + expected);
                success = false;
            }
        }
        return success;
    }

    private static boolean verifyDif(Map<String, Integer> map, Map<String, Set<Integer>> allData) {
        boolean success = true;
        if (map.isEmpty()) {
            System.out.println("dif is empty");
            success = false;
        } else {
            System.out.println("verifying dif");
            if (map.size() != allData.size()) {
                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
                success = false;
            }
            for (Map.Entry<String, Integer> entry : map.entrySet()) {
                int min = SmokeTestDriver.getMin(entry.getKey());
                int max = SmokeTestDriver.getMax(entry.getKey());
                int expected = max - min;
                if (entry.getValue() != null && expected == entry.getValue()) continue;
                System.out.println("fail: key=" + entry.getKey() + " dif=" + entry.getValue() + " expected=" + expected);
                success = false;
            }
        }
        return success;
    }

    private static boolean verifyCnt(Map<String, Long> map, Map<String, Set<Integer>> allData) {
        boolean success = true;
        if (map.isEmpty()) {
            System.out.println("cnt is empty");
            success = false;
        } else {
            System.out.println("verifying cnt");
            if (map.size() != allData.size()) {
                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
                success = false;
            }
            for (Map.Entry<String, Long> entry : map.entrySet()) {
                int min = SmokeTestDriver.getMin(entry.getKey());
                int max = SmokeTestDriver.getMax(entry.getKey());
                long expected = (long)(max - min) + 1L;
                if (expected == entry.getValue()) continue;
                System.out.println("fail: key=" + entry.getKey() + " cnt=" + entry.getValue() + " expected=" + expected);
                success = false;
            }
        }
        return success;
    }

    private static boolean verifySum(Map<String, Long> map, Map<String, Set<Integer>> allData) {
        boolean success = true;
        if (map.isEmpty()) {
            System.out.println("sum is empty");
            success = false;
        } else {
            System.out.println("verifying sum");
            if (map.size() != allData.size()) {
                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
                success = false;
            }
            for (Map.Entry<String, Long> entry : map.entrySet()) {
                int max;
                int min = SmokeTestDriver.getMin(entry.getKey());
                long expected = ((long)min + (long)(max = SmokeTestDriver.getMax(entry.getKey()))) * ((long)(max - min) + 1L) / 2L;
                if (expected == entry.getValue()) continue;
                System.out.println("fail: key=" + entry.getKey() + " sum=" + entry.getValue() + " expected=" + expected);
                success = false;
            }
        }
        return success;
    }

    private static boolean verifyAvg(Map<String, Double> map, Map<String, Set<Integer>> allData) {
        boolean success = true;
        if (map.isEmpty()) {
            System.out.println("avg is empty");
            success = false;
        } else {
            System.out.println("verifying avg");
            if (map.size() != allData.size()) {
                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
                success = false;
            }
            for (Map.Entry<String, Double> entry : map.entrySet()) {
                int min = SmokeTestDriver.getMin(entry.getKey());
                int max = SmokeTestDriver.getMax(entry.getKey());
                double expected = (double)((long)min + (long)max) / 2.0;
                if (entry.getValue() != null && expected == entry.getValue()) continue;
                System.out.println("fail: key=" + entry.getKey() + " avg=" + entry.getValue() + " expected=" + expected);
                success = false;
            }
        }
        return success;
    }

    private static boolean verifyWCnt(Map<String, Long> map, Map<String, Set<Integer>> allData) {
        boolean success = true;
        if (map.isEmpty()) {
            System.out.println("wcnt is empty");
            success = false;
        } else {
            System.out.println("verifying wcnt");
            int expectedSize = 0;
            for (Set<Integer> set : allData.values()) {
                int maxValue = Collections.max(set);
                int minValue = Collections.min(set);
                expectedSize += maxValue / 100 + 1;
                expectedSize -= minValue / 100;
            }
            if (map.size() != expectedSize) {
                System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + expectedSize);
                success = false;
            }
            for (Map.Entry entry : map.entrySet()) {
                long minTime = (long)SmokeTestDriver.getMinFromWKey((String)entry.getKey()) + 946080000000L;
                long maxTime = (long)SmokeTestDriver.getMaxFromWKey((String)entry.getKey()) + 946080000000L;
                long winTime = SmokeTestDriver.getStartFromWKey((String)entry.getKey());
                long expected = 100L;
                if (minTime > winTime) {
                    expected -= minTime - winTime;
                }
                if (maxTime < winTime + 100L - 1L) {
                    expected -= winTime + 100L - 1L - maxTime;
                }
                if (expected == (Long)entry.getValue()) continue;
                System.out.println("fail: key=" + (String)entry.getKey() + " wcnt=" + entry.getValue() + " expected=" + expected);
                success = false;
            }
        }
        return success;
    }

    private static boolean verifyTAgg(Map<String, Long> map, Map<String, Set<Integer>> allData) {
        boolean success = true;
        if (map.isEmpty()) {
            System.out.println("tagg is empty");
            success = false;
        } else {
            System.out.println("verifying tagg");
            HashMap<String, Long> expected = new HashMap<String, Long>();
            for (String string : allData.keySet()) {
                int min = SmokeTestDriver.getMin(string);
                int max = SmokeTestDriver.getMax(string);
                String cnt = Long.toString((long)(max - min) + 1L);
                if (expected.containsKey(cnt)) {
                    expected.put(cnt, (Long)expected.get(cnt) + 1L);
                    continue;
                }
                expected.put(cnt, 1L);
            }
            for (Map.Entry entry : map.entrySet()) {
                String key = (String)entry.getKey();
                Long expectedCount = (Long)expected.remove(key);
                if (expectedCount == null) {
                    expectedCount = 0L;
                }
                if (entry.getValue() == expectedCount) continue;
                System.out.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key));
                success = false;
            }
            for (Map.Entry entry : expected.entrySet()) {
                System.out.println("fail: missingKey=" + (String)entry.getKey() + " expected=" + entry.getValue());
            }
        }
        return success;
    }

    private static int getMin(String key) {
        return Integer.parseInt(key.split("-")[0]);
    }

    private static int getMax(String key) {
        return Integer.parseInt(key.split("-")[1]);
    }

    private static int getMinFromWKey(String key) {
        return SmokeTestDriver.getMin(key.split("@")[0]);
    }

    private static int getMaxFromWKey(String key) {
        return SmokeTestDriver.getMax(key.split("@")[0]);
    }

    private static long getStartFromWKey(String key) {
        return Long.parseLong(key.split("@")[1]);
    }

    private static List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String ... topics) {
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        for (String topic : topics) {
            for (PartitionInfo info : consumer.partitionsFor(topic)) {
                partitions.add(new TopicPartition(info.topic(), info.partition()));
            }
        }
        return partitions;
    }

    private static class ValueList {
        public final String key;
        private final int[] values;
        private int index;

        ValueList(int min, int max) {
            this.key = min + "-" + max;
            this.values = new int[max - min + 1];
            for (int i = 0; i < this.values.length; ++i) {
                this.values[i] = min + i;
            }
            SmokeTestDriver.shuffle(this.values, 10);
            this.index = 0;
        }

        int next() {
            return this.index < this.values.length ? this.values[this.index++] : -1;
        }
    }
}

