package org.apache.kafka.streams.tests;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;

/* loaded from: input_file:org/apache/kafka/streams/tests/SmokeTestDriver.class */
public class SmokeTestDriver extends SmokeTestUtil {
    private static final String[] TOPICS = {"data", "echo", "max", "min", "min-suppressed", "min-raw", "dif", "sum", "sws-raw", "sws-suppressed", "cnt", "avg", "tagg"};
    private static final int MAX_RECORD_EMPTY_RETRIES = 30;

    /* loaded from: input_file:org/apache/kafka/streams/tests/SmokeTestDriver$NumberDeserializer.class */
    public static class NumberDeserializer implements Deserializer<Number> {
        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Number m85deserialize(String str, byte[] bArr) {
            Number number;
            boolean z = -1;
            switch (str.hashCode()) {
                case -1796731926:
                    if (str.equals("sws-raw")) {
                        z = 5;
                        break;
                    }
                    break;
                case -698734574:
                    if (str.equals("sws-suppressed")) {
                        z = 6;
                        break;
                    }
                    break;
                case 96978:
                    if (str.equals("avg")) {
                        z = 12;
                        break;
                    }
                    break;
                case 98665:
                    if (str.equals("cnt")) {
                        z = 10;
                        break;
                    }
                    break;
                case 99457:
                    if (str.equals("dif")) {
                        z = 8;
                        break;
                    }
                    break;
                case 107876:
                    if (str.equals("max")) {
                        z = 7;
                        break;
                    }
                    break;
                case 108114:
                    if (str.equals("min")) {
                        z = 2;
                        break;
                    }
                    break;
                case 114251:
                    if (str.equals("sum")) {
                        z = 9;
                        break;
                    }
                    break;
                case 3076010:
                    if (str.equals("data")) {
                        z = false;
                        break;
                    }
                    break;
                case 3107365:
                    if (str.equals("echo")) {
                        z = true;
                        break;
                    }
                    break;
                case 3552269:
                    if (str.equals("tagg")) {
                        z = 11;
                        break;
                    }
                    break;
                case 1062754861:
                    if (str.equals("min-raw")) {
                        z = 3;
                        break;
                    }
                    break;
                case 1501878383:
                    if (str.equals("min-suppressed")) {
                        z = 4;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                case true:
                case true:
                case true:
                case true:
                case true:
                case true:
                case true:
                case true:
                    number = (Number) SmokeTestUtil.intSerde.deserializer().deserialize(str, bArr);
                    break;
                case true:
                case true:
                case true:
                    number = (Number) SmokeTestUtil.longSerde.deserializer().deserialize(str, bArr);
                    break;
                case true:
                    number = (Number) SmokeTestUtil.doubleSerde.deserializer().deserialize(str, bArr);
                    break;
                default:
                    throw new RuntimeException("unknown topic: " + str);
            }
            return number;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/tests/SmokeTestDriver$TestCallback.class */
    private static class TestCallback implements Callback {
        private final ProducerRecord<byte[], byte[]> originalRecord;
        private final List<ProducerRecord<byte[], byte[]>> needRetry;

        TestCallback(ProducerRecord<byte[], byte[]> producerRecord, List<ProducerRecord<byte[], byte[]>> list) {
            this.originalRecord = producerRecord;
            this.needRetry = list;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                if (exc instanceof TimeoutException) {
                    this.needRetry.add(this.originalRecord);
                } else {
                    exc.printStackTrace();
                    Exit.exit(1);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/tests/SmokeTestDriver$ValueList.class */
    private static class ValueList {
        public final String key;
        private final int[] values;
        private int index;

        ValueList(int i, int i2) {
            this.key = i + "-" + i2;
            this.values = new int[(i2 - i) + 1];
            for (int i3 = 0; i3 < this.values.length; i3++) {
                this.values[i3] = i + i3;
            }
            SmokeTestDriver.shuffle(this.values, 10);
            this.index = 0;
        }

        int next() {
            if (this.index >= this.values.length) {
                return -1;
            }
            int[] iArr = this.values;
            int i = this.index;
            this.index = i + 1;
            return iArr[i];
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/tests/SmokeTestDriver$VerificationResult.class */
    public static class VerificationResult {
        private final boolean passed;
        private final String result;

        VerificationResult(boolean z, String str) {
            this.passed = z;
            this.result = str;
        }

        public boolean passed() {
            return this.passed;
        }

        public String result() {
            return this.result;
        }
    }

    public static String[] topics() {
        return (String[]) Arrays.copyOf(TOPICS, TOPICS.length);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void generatePerpetually(String str, int i, int i2) {
        Properties generatorProperties = generatorProperties(str);
        int i3 = 0;
        ValueList[] valueListArr = new ValueList[i];
        for (int i4 = 0; i4 < i; i4++) {
            valueListArr[i4] = new ValueList(i4, (i4 + i2) - 1);
        }
        Random random = new Random();
        KafkaProducer kafkaProducer = new KafkaProducer(generatorProperties);
        Throwable th = null;
        while (true) {
            try {
                try {
                    int nextInt = random.nextInt(i);
                    kafkaProducer.send(new ProducerRecord("data", stringSerde.serializer().serialize("", valueListArr[nextInt].key), intSerde.serializer().serialize("", Integer.valueOf(valueListArr[nextInt].next()))));
                    i3++;
                    if (i3 % 100 == 0) {
                        System.out.println(Instant.now() + " " + i3 + " records produced");
                    }
                    Utils.sleep(2L);
                } finally {
                }
            } catch (Throwable th2) {
                if (kafkaProducer != null) {
                    if (th != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                throw th2;
            }
        }
    }

    public static Map<String, Set<Integer>> generate(String str, int i, int i2, Duration duration) {
        Properties generatorProperties = generatorProperties(str);
        int i3 = 0;
        HashMap hashMap = new HashMap();
        ValueList[] valueListArr = new ValueList[i];
        for (int i4 = 0; i4 < i; i4++) {
            valueListArr[i4] = new ValueList(i4, (i4 + i2) - 1);
            hashMap.put(valueListArr[i4].key, new HashSet());
        }
        Random random = new Random();
        int length = valueListArr.length;
        long millis = (duration.toMillis() / i) / i2;
        ArrayList<ProducerRecord> arrayList = new ArrayList();
        KafkaProducer kafkaProducer = new KafkaProducer(generatorProperties);
        Throwable th = null;
        while (length > 0) {
            try {
                try {
                    int nextInt = random.nextInt(length);
                    String str2 = valueListArr[nextInt].key;
                    int next = valueListArr[nextInt].next();
                    if (next < 0) {
                        length--;
                        valueListArr[nextInt] = valueListArr[length];
                    } else {
                        ProducerRecord producerRecord = new ProducerRecord("data", stringSerde.serializer().serialize("", str2), intSerde.serializer().serialize("", Integer.valueOf(next)));
                        kafkaProducer.send(producerRecord, new TestCallback(producerRecord, arrayList));
                        i3++;
                        ((Set) hashMap.get(str2)).add(Integer.valueOf(next));
                        if (i3 % 100 == 0) {
                            System.out.println(Instant.now() + " " + i3 + " records produced");
                        }
                        Utils.sleep(Math.max(millis, 2L));
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (kafkaProducer != null) {
                    if (th != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                throw th2;
            }
        }
        kafkaProducer.flush();
        int i5 = 5;
        while (!arrayList.isEmpty()) {
            ArrayList arrayList2 = new ArrayList();
            for (ProducerRecord producerRecord2 : arrayList) {
                System.out.println("retry producing " + ((String) stringSerde.deserializer().deserialize("", (byte[]) producerRecord2.key())));
                kafkaProducer.send(producerRecord2, new TestCallback(producerRecord2, arrayList2));
            }
            kafkaProducer.flush();
            arrayList = arrayList2;
            i5--;
            if (i5 == 0 && !arrayList.isEmpty()) {
                System.err.println("Failed to produce all records after multiple retries");
                Exit.exit(1);
            }
        }
        for (PartitionInfo partitionInfo : kafkaProducer.partitionsFor("data")) {
            kafkaProducer.send(new ProducerRecord(partitionInfo.topic(), Integer.valueOf(partitionInfo.partition()), Long.valueOf(System.currentTimeMillis() + Duration.ofDays(2L).toMillis()), stringSerde.serializer().serialize("", "flush"), intSerde.serializer().serialize("", 0)));
        }
        if (kafkaProducer != null) {
            if (0 != 0) {
                try {
                    kafkaProducer.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                kafkaProducer.close();
            }
        }
        return Collections.unmodifiableMap(hashMap);
    }

    private static Properties generatorProperties(String str) {
        Properties properties = new Properties();
        properties.put("client.id", "SmokeTest");
        properties.put("bootstrap.servers", str);
        properties.put("key.serializer", ByteArraySerializer.class);
        properties.put("value.serializer", ByteArraySerializer.class);
        properties.put("acks", "all");
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void shuffle(int[] iArr, int i) {
        Random random = new Random();
        for (int i2 = 0; i2 < iArr.length; i2++) {
            int nextInt = random.nextInt(Math.min(iArr.length - i2, i)) + i2;
            int i3 = iArr[i2];
            iArr[i2] = iArr[nextInt];
            iArr[nextInt] = i3;
        }
    }

    public static VerificationResult verify(String str, Map<String, Set<Integer>> map, int i) {
        Properties properties = new Properties();
        properties.put("client.id", "verifier");
        properties.put("bootstrap.servers", str);
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", NumberDeserializer.class);
        properties.put("isolation.level", "read_committed");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        List<TopicPartition> allPartitions = getAllPartitions(kafkaConsumer, TOPICS);
        kafkaConsumer.assign(allPartitions);
        kafkaConsumer.seekToBeginning(allPartitions);
        int size = map.size() * i;
        int i2 = 0;
        Map map2 = (Map) Stream.of((Object[]) TOPICS).collect(Collectors.toMap(str2 -> {
            return str2;
        }, str3 -> {
            return new AtomicInteger(0);
        }));
        HashMap hashMap = new HashMap();
        VerificationResult verificationResult = new VerificationResult(false, "no results yet");
        int i3 = 0;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            if (System.currentTimeMillis() - currentTimeMillis >= TimeUnit.MINUTES.toMillis(6L)) {
                break;
            }
            ConsumerRecords poll = kafkaConsumer.poll(Duration.ofSeconds(1L));
            if (!poll.isEmpty() || i2 < size) {
                i3 = 0;
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    String str4 = (String) consumerRecord.key();
                    String str5 = consumerRecord.topic();
                    ((AtomicInteger) map2.get(str5)).incrementAndGet();
                    if (str5.equals("echo")) {
                        i2++;
                        if (i2 % 100 == 0) {
                            System.out.println("Echo records processed = " + i2);
                        }
                    }
                    ((LinkedList) ((Map) hashMap.computeIfAbsent(str5, str6 -> {
                        return new HashMap();
                    })).computeIfAbsent(str4, str7 -> {
                        return new LinkedList();
                    })).add(consumerRecord);
                }
                System.out.println(map2);
            } else {
                verificationResult = verifyAll(map, hashMap);
                if (verificationResult.passed()) {
                    break;
                }
                int i4 = i3;
                i3++;
                if (i4 > MAX_RECORD_EMPTY_RETRIES) {
                    System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
                    break;
                }
                System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying...");
            }
        }
        kafkaConsumer.close();
        System.out.println("Verification time=" + (System.currentTimeMillis() - currentTimeMillis));
        System.out.println("-------------------");
        System.out.println("Result Verification");
        System.out.println("-------------------");
        System.out.println("recordGenerated=" + size);
        System.out.println("recordProcessed=" + i2);
        if (i2 > size) {
            System.out.println("PROCESSED-MORE-THAN-GENERATED");
        } else if (i2 < size) {
            System.out.println("PROCESSED-LESS-THAN-GENERATED");
        }
        Map map3 = (Map) ((Map) hashMap.get("echo")).entrySet().stream().map(entry -> {
            return Utils.mkEntry(entry.getKey(), ((LinkedList) entry.getValue()).stream().map((v0) -> {
                return v0.value();
            }).collect(Collectors.toSet()));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        boolean equals = map.equals(map3);
        if (equals) {
            System.out.println("ALL-RECORDS-DELIVERED");
        } else {
            int i5 = 0;
            Iterator<Map.Entry<String, Set<Integer>>> it2 = map.entrySet().iterator();
            while (it2.hasNext()) {
                i5 += ((Set) map3.get(it2.next().getKey())).size();
            }
            System.out.println("missedRecords=" + i5);
        }
        if (!verificationResult.passed()) {
            verificationResult = verifyAll(map, hashMap);
        }
        boolean passed = equals & verificationResult.passed();
        System.out.println(verificationResult.result());
        System.out.println(passed ? "SUCCESS" : "FAILURE");
        return verificationResult;
    }

    private static VerificationResult verifyAll(Map<String, Set<Integer>> map, Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> map2) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrintStream printStream = new PrintStream(byteArrayOutputStream);
        Throwable th = null;
        try {
            try {
                boolean verifyTAgg = verifyTAgg(printStream, map, map2.get("tagg")) & verifySuppressed(printStream, "min-suppressed", map2) & verify(printStream, "min-suppressed", map, map2, str -> {
                    return getMin(str.substring(1, str.length() - 1).replaceAll("@.*", ""));
                }) & verifySuppressed(printStream, "sws-suppressed", map2) & verify(printStream, "min", map, map2, SmokeTestDriver::getMin) & verify(printStream, "max", map, map2, SmokeTestDriver::getMax) & verify(printStream, "dif", map, map2, str2 -> {
                    return Integer.valueOf(getMax(str2).intValue() - getMin(str2).intValue());
                }) & verify(printStream, "sum", map, map2, SmokeTestDriver::getSum) & verify(printStream, "cnt", map, map2, str3 -> {
                    return Long.valueOf((getMax(str3).intValue() - getMin(str3).intValue()) + 1);
                }) & verify(printStream, "avg", map, map2, SmokeTestDriver::getAvg);
                if (printStream != null) {
                    if (0 != 0) {
                        try {
                            printStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        printStream.close();
                    }
                }
                return new VerificationResult(verifyTAgg, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
            } finally {
            }
        } catch (Throwable th3) {
            if (printStream != null) {
                if (th != null) {
                    try {
                        printStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    printStream.close();
                }
            }
            throw th3;
        }
    }

    private static boolean verify(PrintStream printStream, String str, Map<String, Set<Integer>> map, Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> map2, Function<String, Number> function) {
        Map<String, LinkedList<ConsumerRecord<String, Number>>> map3 = map2.get("data");
        Map<String, LinkedList<ConsumerRecord<String, Number>>> orDefault = map2.getOrDefault(str, Collections.emptyMap());
        if (orDefault.isEmpty()) {
            printStream.println(str + " is empty");
            return false;
        }
        printStream.printf("verifying %s with %d keys%n", str, Integer.valueOf(orDefault.size()));
        if (orDefault.size() != map.size()) {
            printStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n", Integer.valueOf(orDefault.size()), Integer.valueOf(map.size()), orDefault.keySet(), map.keySet());
            return false;
        }
        for (Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : orDefault.entrySet()) {
            String key = entry.getKey();
            Number apply = function.apply(key);
            Number number = (Number) entry.getValue().getLast().value();
            if (!apply.equals(number)) {
                printStream.printf("%s fail: key=%s actual=%s expected=%s%n\t inputEvents=%n%s%n\toutputEvents=%n%s%n", str, key, number, apply, indent("\t\t", map3.get(key)), indent("\t\t", entry.getValue()));
                return false;
            }
        }
        return true;
    }

    private static boolean verifySuppressed(PrintStream printStream, String str, Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> map) {
        printStream.println("verifying suppressed " + str);
        for (Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : map.getOrDefault(str, Collections.emptyMap()).entrySet()) {
            if (entry.getValue().size() != 1) {
                String replace = str.replace("-suppressed", "-raw");
                String key = entry.getKey();
                printStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n\traw results:%n%s%n\tinput data:%n%s%n", key, indent("\t\t", entry.getValue()), indent("\t\t", map.get(replace).get(key)), indent("\t\t", map.get("data").get(key.substring(1, key.length() - 1).replaceAll("@.*", ""))));
                return false;
            }
        }
        return true;
    }

    private static String indent(String str, Iterable<ConsumerRecord<String, Number>> iterable) {
        StringBuilder sb = new StringBuilder();
        Iterator<ConsumerRecord<String, Number>> it = iterable.iterator();
        while (it.hasNext()) {
            sb.append(str).append(it.next()).append('\n');
        }
        return sb.toString();
    }

    private static Long getSum(String str) {
        return Long.valueOf(((getMin(str).intValue() + getMax(str).intValue()) * ((r0 - r0) + 1)) / 2);
    }

    private static Double getAvg(String str) {
        return Double.valueOf((getMin(str).intValue() + getMax(str).intValue()) / 2.0d);
    }

    private static boolean verifyTAgg(PrintStream printStream, Map<String, Set<Integer>> map, Map<String, LinkedList<ConsumerRecord<String, Number>>> map2) {
        if (map2 == null) {
            printStream.println("tagg is missing");
            return false;
        }
        if (map2.isEmpty()) {
            printStream.println("tagg is empty");
            return false;
        }
        printStream.println("verifying tagg");
        HashMap hashMap = new HashMap();
        for (String str : map.keySet()) {
            String l = Long.toString((getMax(str).intValue() - getMin(str).intValue()) + 1);
            hashMap.put(l, Long.valueOf(((Long) hashMap.getOrDefault(l, 0L)).longValue() + 1));
        }
        for (Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : map2.entrySet()) {
            String key = entry.getKey();
            Long l2 = (Long) hashMap.remove(key);
            if (l2 == null) {
                l2 = 0L;
            }
            if (((Number) entry.getValue().getLast().value()).longValue() != l2.longValue()) {
                printStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + l2);
                printStream.println("\t outputEvents: " + entry.getValue());
                return false;
            }
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Number getMin(String str) {
        return Integer.valueOf(Integer.parseInt(str.split("-")[0]));
    }

    private static Number getMax(String str) {
        return Integer.valueOf(Integer.parseInt(str.split("-")[1]));
    }

    private static List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> kafkaConsumer, String... strArr) {
        ArrayList arrayList = new ArrayList();
        for (String str : strArr) {
            for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(str)) {
                arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
        }
        return arrayList;
    }
}
