package org.apache.kafka.streams.tests;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;

/* loaded from: input_file:org/apache/kafka/streams/tests/EosTestDriver.class */
public class EosTestDriver extends SmokeTestUtil {
    private static final int MAX_NUMBER_OF_KEYS = 20000;
    private static final long MAX_IDLE_TIME_MS = 600000;
    private static volatile boolean isRunning = true;
    private static CountDownLatch terminated = new CountDownLatch(1);
    private static int numRecordsProduced = 0;

    private static synchronized void updateNumRecordsProduces(int i) {
        numRecordsProduced += i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void generate(String str) {
        Exit.addShutdownHook("streams-eos-test-driver-shutdown-hook", () -> {
            System.out.println("Terminating");
            isRunning = false;
            try {
                if (terminated.await(5L, TimeUnit.MINUTES)) {
                    System.out.println("Terminated");
                } else {
                    System.out.println("Terminated with timeout");
                }
            } catch (InterruptedException e) {
                e.printStackTrace(System.err);
                System.out.println("Terminated with error");
            }
            System.err.flush();
            System.out.flush();
        });
        Properties properties = new Properties();
        properties.put("client.id", "EosTest");
        properties.put("bootstrap.servers", str);
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", IntegerSerializer.class);
        properties.put("enable.idempotence", true);
        HashMap hashMap = new HashMap();
        try {
            KafkaProducer kafkaProducer = new KafkaProducer(properties);
            Throwable th = null;
            try {
                try {
                    Random random = new Random(System.currentTimeMillis());
                    while (isRunning) {
                        kafkaProducer.send(new ProducerRecord("data", "" + random.nextInt(MAX_NUMBER_OF_KEYS), Integer.valueOf(random.nextInt(10000))), (recordMetadata, exc) -> {
                            if (exc == null) {
                                ((List) hashMap.getOrDefault(Integer.valueOf(recordMetadata.partition()), new LinkedList())).add(Long.valueOf(recordMetadata.offset()));
                                return;
                            }
                            exc.printStackTrace(System.err);
                            System.err.flush();
                            if (exc instanceof TimeoutException) {
                                try {
                                    updateNumRecordsProduces(-Integer.parseInt(exc.getMessage().split(" ")[2]));
                                } catch (Exception e) {
                                }
                            }
                        });
                        updateNumRecordsProduces(1);
                        if (numRecordsProduced % 1000 == 0) {
                            System.out.println(numRecordsProduced + " records produced");
                            System.out.flush();
                        }
                        Utils.sleep(random.nextInt(10));
                    }
                    if (kafkaProducer != null) {
                        if (0 != 0) {
                            try {
                                kafkaProducer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            kafkaProducer.close();
                        }
                    }
                    System.out.println("Producer closed: " + numRecordsProduced + " records produced");
                    System.out.flush();
                    for (Map.Entry entry : hashMap.entrySet()) {
                        ((List) entry.getValue()).sort((v0, v1) -> {
                            return v0.compareTo(v1);
                        });
                        for (int i = 0; i < ((List) entry.getValue()).size() - 1; i++) {
                            if (((Long) ((List) entry.getValue()).get(i)).longValue() != i) {
                                System.err.println("Offset for partition " + entry.getKey() + " is not " + i + " as expected but " + ((List) entry.getValue()).get(i));
                                System.err.flush();
                            }
                        }
                        System.out.println("Max offset of partition " + entry.getKey() + " is " + ((List) entry.getValue()).get(((List) entry.getValue()).size() - 1));
                    }
                    Properties properties2 = new Properties();
                    properties2.put("client.id", "verifier");
                    properties2.put("bootstrap.servers", str);
                    properties2.put("key.deserializer", ByteArrayDeserializer.class);
                    properties2.put("value.deserializer", ByteArrayDeserializer.class);
                    properties2.put("isolation.level", IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
                    KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
                    Throwable th3 = null;
                    try {
                        try {
                            List<TopicPartition> allPartitions = getAllPartitions(kafkaConsumer, "data");
                            System.out.println("Partitions: " + allPartitions);
                            System.out.flush();
                            kafkaConsumer.assign(allPartitions);
                            kafkaConsumer.seekToEnd(allPartitions);
                            for (TopicPartition topicPartition : allPartitions) {
                                System.out.println("End-offset for " + topicPartition + " is " + kafkaConsumer.position(topicPartition));
                                System.out.flush();
                            }
                            if (kafkaConsumer != null) {
                                if (0 != 0) {
                                    try {
                                        kafkaConsumer.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                } else {
                                    kafkaConsumer.close();
                                }
                            }
                            System.out.flush();
                            terminated.countDown();
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        } catch (Throwable th5) {
            terminated.countDown();
            throw th5;
        }
    }

    public static void verify(String str, boolean z) {
        Admin create;
        Throwable th;
        String[] strArr;
        String[] strArr2;
        Properties properties = new Properties();
        properties.put("client.id", "verifier");
        properties.put("bootstrap.servers", str);
        properties.put("key.deserializer", ByteArrayDeserializer.class);
        properties.put("value.deserializer", ByteArrayDeserializer.class);
        properties.put("isolation.level", IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            Throwable th2 = null;
            try {
                try {
                    verifyAllTransactionFinished(kafkaConsumer, str, z);
                    if (kafkaConsumer != null) {
                        if (0 != 0) {
                            try {
                                kafkaConsumer.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            kafkaConsumer.close();
                        }
                    }
                    create = Admin.create(properties);
                    th = null;
                } finally {
                }
                try {
                    try {
                        ensureStreamsApplicationDown(create);
                        Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(create, z);
                        if (create != null) {
                            if (0 != 0) {
                                try {
                                    create.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                create.close();
                            }
                        }
                        if (z) {
                            strArr = new String[]{"data", "repartition"};
                            strArr2 = new String[]{"echo", "min", "sum", "repartition", "max", "cnt"};
                        } else {
                            strArr = new String[]{"data"};
                            strArr2 = new String[]{"echo", "min", "sum"};
                        }
                        try {
                            KafkaConsumer kafkaConsumer2 = new KafkaConsumer(properties);
                            Throwable th5 = null;
                            try {
                                List<TopicPartition> allPartitions = getAllPartitions(kafkaConsumer2, strArr);
                                kafkaConsumer2.assign(allPartitions);
                                kafkaConsumer2.seekToBeginning(allPartitions);
                                Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> records = getRecords(kafkaConsumer2, committedOffsets, z, true);
                                if (kafkaConsumer2 != null) {
                                    if (0 != 0) {
                                        try {
                                            kafkaConsumer2.close();
                                        } catch (Throwable th6) {
                                            th5.addSuppressed(th6);
                                        }
                                    } else {
                                        kafkaConsumer2.close();
                                    }
                                }
                                try {
                                    kafkaConsumer = new KafkaConsumer(properties);
                                    Throwable th7 = null;
                                    try {
                                        try {
                                            List<TopicPartition> allPartitions2 = getAllPartitions(kafkaConsumer, strArr2);
                                            kafkaConsumer.assign(allPartitions2);
                                            kafkaConsumer.seekToBeginning(allPartitions2);
                                            Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> records2 = getRecords(kafkaConsumer, kafkaConsumer.endOffsets(allPartitions2), z, false);
                                            if (kafkaConsumer != null) {
                                                if (0 != 0) {
                                                    try {
                                                        kafkaConsumer.close();
                                                    } catch (Throwable th8) {
                                                        th7.addSuppressed(th8);
                                                    }
                                                } else {
                                                    kafkaConsumer.close();
                                                }
                                            }
                                            verifyReceivedAllRecords(records.get("data"), records2.get("echo"));
                                            if (z) {
                                                verifyReceivedAllRecords(records.get("data"), records2.get("repartition"));
                                            }
                                            verifyMin(records.get("data"), records2.get("min"));
                                            verifySum(records.get("data"), records2.get("sum"));
                                            if (z) {
                                                verifyMax(records.get("repartition"), records2.get("max"));
                                                verifyCnt(records.get("repartition"), records2.get("cnt"));
                                            }
                                            System.out.println("ALL-RECORDS-DELIVERED");
                                            System.out.flush();
                                        } finally {
                                        }
                                    } finally {
                                        if (kafkaConsumer != null) {
                                            if (th7 != null) {
                                                try {
                                                    kafkaConsumer.close();
                                                } catch (Throwable th9) {
                                                    th7.addSuppressed(th9);
                                                }
                                            } else {
                                                kafkaConsumer.close();
                                            }
                                        }
                                    }
                                } catch (Exception e) {
                                    e.printStackTrace(System.err);
                                    System.out.println("FAILED");
                                }
                            } finally {
                            }
                        } catch (Exception e2) {
                            e2.printStackTrace(System.err);
                            System.out.println("FAILED");
                        }
                    } finally {
                    }
                } catch (Throwable th10) {
                    if (create != null) {
                        if (th != null) {
                            try {
                                create.close();
                            } catch (Throwable th11) {
                                th.addSuppressed(th11);
                            }
                        } else {
                            create.close();
                        }
                    }
                    throw th10;
                }
            } finally {
            }
        } catch (Exception e3) {
            e3.printStackTrace(System.err);
            System.out.println("FAILED");
        }
    }

    private static void ensureStreamsApplicationDown(Admin admin) {
        ConsumerGroupDescription consumerGroupDescription;
        long currentTimeMillis = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
        do {
            consumerGroupDescription = getConsumerGroupDescription(admin);
            if (System.currentTimeMillis() > currentTimeMillis && !consumerGroupDescription.members().isEmpty()) {
                throw new RuntimeException("Streams application not down after 600 seconds. Group: " + consumerGroupDescription);
            }
            sleep(1000L);
        } while (!consumerGroupDescription.members().isEmpty());
    }

    private static Map<TopicPartition, Long> getCommittedOffsets(Admin admin, boolean z) {
        try {
            Map map = (Map) admin.listConsumerGroupOffsets("EosTest").partitionsToOffsetAndMetadata().get(10L, TimeUnit.SECONDS);
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : map.entrySet()) {
                String str = ((TopicPartition) entry.getKey()).topic();
                if (str.equals("data") || (z && str.equals("repartition"))) {
                    hashMap.put(entry.getKey(), Long.valueOf(((OffsetAndMetadata) entry.getValue()).offset()));
                }
            }
            return hashMap;
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getRecords(KafkaConsumer<byte[], byte[]> kafkaConsumer, Map<TopicPartition, Long> map, boolean z, boolean z2) {
        boolean z3;
        System.out.println("read end offset: " + map);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        long currentTimeMillis = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
        boolean z4 = false;
        while (true) {
            z3 = z4;
            if (z3 || System.currentTimeMillis() >= currentTimeMillis) {
                break;
            }
            Iterator it = kafkaConsumer.poll(Duration.ofSeconds(1L)).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                currentTimeMillis = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
                TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                hashMap2.put(topicPartition, Long.valueOf(consumerRecord.offset()));
                long longValue = map.get(topicPartition).longValue();
                if (consumerRecord.offset() < longValue) {
                    addRecord(consumerRecord, hashMap, z);
                } else if (!z2) {
                    throw new RuntimeException("FAIL: did receive more records than expected for " + topicPartition + " (expected EOL offset: " + longValue + "; current offset: " + consumerRecord.offset());
                }
            }
            for (TopicPartition topicPartition2 : map.keySet()) {
                hashMap3.put(topicPartition2, Long.valueOf(kafkaConsumer.position(topicPartition2)));
                if (kafkaConsumer.position(topicPartition2) >= map.get(topicPartition2).longValue()) {
                    kafkaConsumer.pause(Collections.singletonList(topicPartition2));
                }
            }
            z4 = kafkaConsumer.paused().size() == map.keySet().size();
        }
        if (z3) {
            return hashMap;
        }
        System.err.println("Pause partitions (ie, received all data): " + kafkaConsumer.paused());
        System.err.println("Max received offset per partition: " + hashMap2);
        System.err.println("Max consumer position per partition: " + hashMap3);
        throw new RuntimeException("FAIL: did not receive all records after 600 sec idle time.");
    }

    private static void addRecord(ConsumerRecord<byte[], byte[]> consumerRecord, Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> map, boolean z) {
        String str = consumerRecord.topic();
        TopicPartition topicPartition = new TopicPartition(str, consumerRecord.partition());
        if (!verifyTopic(str, z)) {
            throw new RuntimeException("FAIL: received data from unexpected topic: " + consumerRecord);
        }
        map.computeIfAbsent(str, str2 -> {
            return new HashMap();
        }).computeIfAbsent(topicPartition, topicPartition2 -> {
            return new ArrayList();
        }).add(consumerRecord);
    }

    private static boolean verifyTopic(String str, boolean z) {
        boolean z2 = "data".equals(str) || "echo".equals(str) || "min".equals(str) || "sum".equals(str);
        return z ? z2 || "repartition".equals(str) || "max".equals(str) || "cnt".equals(str) : z2;
    }

    private static void verifyReceivedAllRecords(Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map2) {
        if (map.size() != map2.size()) {
            throw new RuntimeException("Result verification failed. Received " + map2.size() + " records but expected " + map.size());
        }
        StringDeserializer stringDeserializer = new StringDeserializer();
        IntegerDeserializer integerDeserializer = new IntegerDeserializer();
        for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> entry : map2.entrySet()) {
            Iterator<ConsumerRecord<byte[], byte[]>> it = map.get(new TopicPartition("data", entry.getKey().partition())).iterator();
            for (ConsumerRecord<byte[], byte[]> consumerRecord : entry.getValue()) {
                ConsumerRecord<byte[], byte[]> next = it.next();
                String deserialize = stringDeserializer.deserialize(consumerRecord.topic(), (byte[]) consumerRecord.key());
                int intValue = integerDeserializer.deserialize(consumerRecord.topic(), (byte[]) consumerRecord.value()).intValue();
                String deserialize2 = stringDeserializer.deserialize(next.topic(), (byte[]) next.key());
                int intValue2 = integerDeserializer.deserialize(next.topic(), (byte[]) next.value()).intValue();
                if (!deserialize.equals(deserialize2) || intValue != intValue2) {
                    throw new RuntimeException("Result verification failed for " + consumerRecord + " expected <" + deserialize2 + "," + intValue2 + "> but was <" + deserialize + "," + intValue + ">");
                }
            }
        }
    }

    private static void verifyMin(Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map2) {
        StringDeserializer stringDeserializer = new StringDeserializer();
        IntegerDeserializer integerDeserializer = new IntegerDeserializer();
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> entry : map2.entrySet()) {
            List<ConsumerRecord<byte[], byte[]>> list = map.get(new TopicPartition("data", entry.getKey().partition()));
            List<ConsumerRecord<byte[], byte[]>> value = entry.getValue();
            if (list.size() != value.size()) {
                throw new RuntimeException("Result verification failed: expected " + list.size() + " records for " + entry.getKey() + " but received " + value.size());
            }
            Iterator<ConsumerRecord<byte[], byte[]>> it = list.iterator();
            for (ConsumerRecord<byte[], byte[]> consumerRecord : value) {
                ConsumerRecord<byte[], byte[]> next = it.next();
                String deserialize = stringDeserializer.deserialize(consumerRecord.topic(), (byte[]) consumerRecord.key());
                int intValue = integerDeserializer.deserialize(consumerRecord.topic(), (byte[]) consumerRecord.value()).intValue();
                String deserialize2 = stringDeserializer.deserialize(next.topic(), (byte[]) next.key());
                int intValue2 = integerDeserializer.deserialize(next.topic(), (byte[]) next.value()).intValue();
                Integer num = (Integer) hashMap.get(deserialize2);
                Integer valueOf = num == null ? Integer.valueOf(intValue2) : Integer.valueOf(Math.min(num.intValue(), intValue2));
                hashMap.put(deserialize2, valueOf);
                if (!deserialize.equals(deserialize2) || intValue != valueOf.intValue()) {
                    throw new RuntimeException("Result verification failed for " + consumerRecord + " expected <" + deserialize2 + "," + valueOf + "> but was <" + deserialize + "," + intValue + ">");
                }
            }
        }
    }

    private static void verifySum(Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map2) {
        StringDeserializer stringDeserializer = new StringDeserializer();
        IntegerDeserializer integerDeserializer = new IntegerDeserializer();
        LongDeserializer longDeserializer = new LongDeserializer();
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> entry : map2.entrySet()) {
            List<ConsumerRecord<byte[], byte[]>> list = map.get(new TopicPartition("data", entry.getKey().partition()));
            List<ConsumerRecord<byte[], byte[]>> value = entry.getValue();
            if (list.size() != value.size()) {
                throw new RuntimeException("Result verification failed: expected " + list.size() + " records for " + entry.getKey() + " but received " + value.size());
            }
            Iterator<ConsumerRecord<byte[], byte[]>> it = list.iterator();
            for (ConsumerRecord<byte[], byte[]> consumerRecord : value) {
                ConsumerRecord<byte[], byte[]> next = it.next();
                String deserialize = stringDeserializer.deserialize(consumerRecord.topic(), (byte[]) consumerRecord.key());
                long longValue = longDeserializer.deserialize(consumerRecord.topic(), (byte[]) consumerRecord.value()).longValue();
                String deserialize2 = stringDeserializer.deserialize(next.topic(), (byte[]) next.key());
                int intValue = integerDeserializer.deserialize(next.topic(), (byte[]) next.value()).intValue();
                Long l = (Long) hashMap.get(deserialize2);
                Long valueOf = l == null ? Long.valueOf(intValue) : Long.valueOf(l.longValue() + intValue);
                hashMap.put(deserialize2, valueOf);
                if (!deserialize.equals(deserialize2) || longValue != valueOf.longValue()) {
                    throw new RuntimeException("Result verification failed for " + consumerRecord + " expected <" + deserialize2 + "," + valueOf + "> but was <" + deserialize + "," + longValue + ">");
                }
            }
        }
    }

    private static void verifyMax(Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map2) {
        StringDeserializer stringDeserializer = new StringDeserializer();
        IntegerDeserializer integerDeserializer = new IntegerDeserializer();
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> entry : map2.entrySet()) {
            List<ConsumerRecord<byte[], byte[]>> list = map.get(new TopicPartition("repartition", entry.getKey().partition()));
            List<ConsumerRecord<byte[], byte[]>> value = entry.getValue();
            if (list.size() != value.size()) {
                throw new RuntimeException("Result verification failed: expected " + list.size() + " records for " + entry.getKey() + " but received " + value.size());
            }
            Iterator<ConsumerRecord<byte[], byte[]>> it = list.iterator();
            for (ConsumerRecord<byte[], byte[]> consumerRecord : value) {
                ConsumerRecord<byte[], byte[]> next = it.next();
                String deserialize = stringDeserializer.deserialize(consumerRecord.topic(), (byte[]) consumerRecord.key());
                int intValue = integerDeserializer.deserialize(consumerRecord.topic(), (byte[]) consumerRecord.value()).intValue();
                String deserialize2 = stringDeserializer.deserialize(next.topic(), (byte[]) next.key());
                int intValue2 = integerDeserializer.deserialize(next.topic(), (byte[]) next.value()).intValue();
                Integer num = (Integer) hashMap.get(deserialize2);
                if (num == null) {
                    num = Integer.MIN_VALUE;
                }
                Integer valueOf = Integer.valueOf(Math.max(num.intValue(), intValue2));
                hashMap.put(deserialize2, valueOf);
                if (!deserialize.equals(deserialize2) || intValue != valueOf.intValue()) {
                    throw new RuntimeException("Result verification failed for " + consumerRecord + " expected <" + deserialize2 + "," + valueOf + "> but was <" + deserialize + "," + intValue + ">");
                }
            }
        }
    }

    private static void verifyCnt(Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map2) {
        StringDeserializer stringDeserializer = new StringDeserializer();
        LongDeserializer longDeserializer = new LongDeserializer();
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> entry : map2.entrySet()) {
            List<ConsumerRecord<byte[], byte[]>> list = map.get(new TopicPartition("repartition", entry.getKey().partition()));
            List<ConsumerRecord<byte[], byte[]>> value = entry.getValue();
            if (list.size() != value.size()) {
                throw new RuntimeException("Result verification failed: expected " + list.size() + " records for " + entry.getKey() + " but received " + value.size());
            }
            Iterator<ConsumerRecord<byte[], byte[]>> it = list.iterator();
            for (ConsumerRecord<byte[], byte[]> consumerRecord : value) {
                ConsumerRecord<byte[], byte[]> next = it.next();
                String deserialize = stringDeserializer.deserialize(consumerRecord.topic(), (byte[]) consumerRecord.key());
                long longValue = longDeserializer.deserialize(consumerRecord.topic(), (byte[]) consumerRecord.value()).longValue();
                String deserialize2 = stringDeserializer.deserialize(next.topic(), (byte[]) next.key());
                Long l = (Long) hashMap.get(deserialize2);
                if (l == null) {
                    l = 0L;
                }
                Long valueOf = Long.valueOf(l.longValue() + 1);
                hashMap.put(deserialize2, valueOf);
                if (!deserialize.equals(deserialize2) || longValue != valueOf.longValue()) {
                    throw new RuntimeException("Result verification failed for " + consumerRecord + " expected <" + deserialize2 + "," + valueOf + "> but was <" + deserialize + "," + longValue + ">");
                }
            }
        }
    }

    private static void verifyAllTransactionFinished(KafkaConsumer<byte[], byte[]> kafkaConsumer, String str, boolean z) {
        List<TopicPartition> allPartitions = getAllPartitions(kafkaConsumer, z ? new String[]{"echo", "min", "sum", "repartition", "max", "cnt"} : new String[]{"echo", "min", "sum"});
        kafkaConsumer.assign(allPartitions);
        kafkaConsumer.seekToEnd(allPartitions);
        for (TopicPartition topicPartition : allPartitions) {
            System.out.println(topicPartition + " at position " + kafkaConsumer.position(topicPartition));
        }
        Properties properties = new Properties();
        properties.put("client.id", "consumer-uncommitted");
        properties.put("bootstrap.servers", str);
        properties.put("key.deserializer", ByteArrayDeserializer.class);
        properties.put("value.deserializer", ByteArrayDeserializer.class);
        long currentTimeMillis = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
        KafkaConsumer kafkaConsumer2 = new KafkaConsumer(properties);
        Throwable th = null;
        while (!allPartitions.isEmpty() && System.currentTimeMillis() < currentTimeMillis) {
            try {
                try {
                    kafkaConsumer.seekToEnd(allPartitions);
                    Map endOffsets = kafkaConsumer2.endOffsets(allPartitions);
                    Iterator<TopicPartition> it = allPartitions.iterator();
                    while (it.hasNext()) {
                        TopicPartition next = it.next();
                        long position = kafkaConsumer.position(next);
                        if (position == ((Long) endOffsets.get(next)).longValue()) {
                            it.remove();
                            System.out.println("Removing " + next + " at position " + position);
                        } else {
                            if (kafkaConsumer.position(next) > ((Long) endOffsets.get(next)).longValue()) {
                                throw new IllegalStateException("Offset for partition " + next + " is larger than topic endOffset: " + position + " > " + endOffsets.get(next));
                            }
                            System.out.println("Retry " + next + " at position " + position);
                        }
                    }
                    sleep(1000L);
                } finally {
                }
            } catch (Throwable th2) {
                if (kafkaConsumer2 != null) {
                    if (th != null) {
                        try {
                            kafkaConsumer2.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        kafkaConsumer2.close();
                    }
                }
                throw th2;
            }
        }
        if (kafkaConsumer2 != null) {
            if (0 != 0) {
                try {
                    kafkaConsumer2.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                kafkaConsumer2.close();
            }
        }
        if (!allPartitions.isEmpty()) {
            throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last 600 sec.");
        }
    }

    private static List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> kafkaConsumer, String... strArr) {
        ArrayList arrayList = new ArrayList();
        for (String str : strArr) {
            for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(str)) {
                arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
        }
        return arrayList;
    }

    private static ConsumerGroupDescription getConsumerGroupDescription(Admin admin) {
        try {
            return (ConsumerGroupDescription) ((KafkaFuture) admin.describeConsumerGroups(Collections.singleton("EosTest")).describedGroups().get("EosTest")).get(10L, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) {
            e.printStackTrace();
            throw new RuntimeException("Unexpected Exception getting group description", e);
        }
    }
}
