package org.apache.kafka.streams.integration.utils;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.api.Request;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:org/apache/kafka/streams/integration/utils/IntegrationTestUtils.class */
public class IntegrationTestUtils {
    public static final long DEFAULT_TIMEOUT = 60000;
    private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestUtils.class);

    /* loaded from: input_file:org/apache/kafka/streams/integration/utils/IntegrationTestUtils$ConsumerGroupInactiveCondition.class */
    private static class ConsumerGroupInactiveCondition implements TestCondition {
        private final Admin adminClient;
        private final String applicationId;

        private ConsumerGroupInactiveCondition(Admin admin, String str) {
            this.adminClient = admin;
            this.applicationId = str;
        }

        public boolean conditionMet() {
            return IntegrationTestUtils.isEmptyConsumerGroup(this.adminClient, this.applicationId);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/utils/IntegrationTestUtils$StableAssignmentListener.class */
    public static class StableAssignmentListener implements AssignorConfiguration.AssignmentListener {
        final AtomicInteger numStableAssignments = new AtomicInteger(0);
        int nextExpectedNumStableAssignments;

        public void onAssignmentComplete(boolean z) {
            if (z) {
                this.numStableAssignments.incrementAndGet();
            }
        }

        public int numStableAssignments() {
            return this.numStableAssignments.get();
        }

        public void prepareForRebalance() {
            this.nextExpectedNumStableAssignments = this.numStableAssignments.get() + 1;
        }

        public void waitForNextStableAssignment(long j) throws InterruptedException {
            TestUtils.waitForCondition(() -> {
                return numStableAssignments() >= this.nextExpectedNumStableAssignments;
            }, j, () -> {
                return "Client did not reach " + this.nextExpectedNumStableAssignments + " stable assignments on time, numStableAssignments was " + numStableAssignments();
            });
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/utils/IntegrationTestUtils$StateListenerStub.class */
    public static class StateListenerStub implements StreamThread.StateListener {
        boolean toPendingShutdownSeen = false;

        public void onChange(Thread thread, ThreadStateTransitionValidator threadStateTransitionValidator, ThreadStateTransitionValidator threadStateTransitionValidator2) {
            if (threadStateTransitionValidator == StreamThread.State.PENDING_SHUTDOWN) {
                this.toPendingShutdownSeen = true;
            }
        }

        public boolean transitToPendingShutdownSeen() {
            return this.toPendingShutdownSeen;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/utils/IntegrationTestUtils$TrackingStateRestoreListener.class */
    public static class TrackingStateRestoreListener implements StateRestoreListener {
        public final Map<TopicPartition, AtomicLong> changelogToStartOffset = new ConcurrentHashMap();
        public final Map<TopicPartition, AtomicLong> changelogToEndOffset = new ConcurrentHashMap();
        public final Map<TopicPartition, AtomicLong> changelogToTotalNumRestored = new ConcurrentHashMap();

        public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
            this.changelogToStartOffset.put(topicPartition, new AtomicLong(j));
            this.changelogToEndOffset.put(topicPartition, new AtomicLong(j2));
            this.changelogToTotalNumRestored.put(topicPartition, new AtomicLong(0L));
        }

        public void onBatchRestored(TopicPartition topicPartition, String str, long j, long j2) {
            this.changelogToTotalNumRestored.get(topicPartition).addAndGet(j2);
        }

        public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
        }

        public boolean allStartOffsetsAtZero() {
            Iterator<AtomicLong> it = this.changelogToStartOffset.values().iterator();
            while (it.hasNext()) {
                if (it.next().get() != 0) {
                    return false;
                }
            }
            return true;
        }

        public long totalNumRestored() {
            long j = 0;
            Iterator<AtomicLong> it = this.changelogToTotalNumRestored.values().iterator();
            while (it.hasNext()) {
                j += it.next().get();
            }
            return j;
        }
    }

    public static String safeUniqueTestName(Class<?> cls, TestName testName) {
        return (cls.getSimpleName() + testName.getMethodName()).replace('.', '_').replace('[', '_').replace(']', '_').replace(' ', '_').replace('=', '_');
    }

    public static void purgeLocalStreamsState(Properties properties) throws IOException {
        String path = TestUtils.IO_TMP_DIR.getPath();
        String property = properties.getProperty("state.dir");
        if (property != null) {
            File file = Paths.get(property, new String[0]).normalize().toFile();
            if (file.getAbsolutePath().startsWith(path)) {
                Utils.delete(new File(file.getAbsolutePath()));
            }
        }
    }

    public static void purgeLocalStreamsState(Collection<Properties> collection) throws IOException {
        Iterator<Properties> it = collection.iterator();
        while (it.hasNext()) {
            purgeLocalStreamsState(it.next());
        }
    }

    public static void cleanStateBeforeTest(EmbeddedKafkaCluster embeddedKafkaCluster, String... strArr) {
        cleanStateBeforeTest(embeddedKafkaCluster, 1, strArr);
    }

    public static void cleanStateBeforeTest(EmbeddedKafkaCluster embeddedKafkaCluster, int i, String... strArr) {
        try {
            embeddedKafkaCluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
            for (String str : strArr) {
                embeddedKafkaCluster.createTopic(str, i, 1);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static void quietlyCleanStateAfterTest(EmbeddedKafkaCluster embeddedKafkaCluster, KafkaStreams kafkaStreams) {
        try {
            kafkaStreams.cleanUp();
            embeddedKafkaCluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
        } catch (InterruptedException | RuntimeException e) {
            LOG.warn("Ignoring failure to clean test state", e);
        }
    }

    public static <K, V> void produceKeyValuesSynchronously(String str, Collection<KeyValue<K, V>> collection, Properties properties, Time time) {
        produceKeyValuesSynchronously(str, (Collection) collection, properties, time, false);
    }

    public static <K, V> void produceKeyValuesSynchronously(String str, Collection<KeyValue<K, V>> collection, Properties properties, Headers headers, Time time) {
        produceKeyValuesSynchronously(str, collection, properties, headers, time, false);
    }

    public static <K, V> void produceKeyValuesSynchronously(String str, Collection<KeyValue<K, V>> collection, Properties properties, Time time, boolean z) {
        produceKeyValuesSynchronously(str, collection, properties, null, time, z);
    }

    public static <K, V> void produceKeyValuesSynchronously(String str, Collection<KeyValue<K, V>> collection, Properties properties, Headers headers, Time time, boolean z) {
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Throwable th = null;
        if (z) {
            try {
                try {
                    kafkaProducer.initTransactions();
                    kafkaProducer.beginTransaction();
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (kafkaProducer != null) {
                    if (th != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                throw th3;
            }
        }
        for (KeyValue<K, V> keyValue : collection) {
            kafkaProducer.send(new ProducerRecord(str, (Integer) null, Long.valueOf(time.milliseconds()), keyValue.key, keyValue.value, headers));
            time.sleep(1L);
        }
        if (z) {
            kafkaProducer.commitTransaction();
        }
        if (kafkaProducer != null) {
            if (0 == 0) {
                kafkaProducer.close();
                return;
            }
            try {
                kafkaProducer.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String str, Collection<KeyValue<K, V>> collection, Properties properties, Long l) {
        produceKeyValuesSynchronouslyWithTimestamp(str, collection, properties, l, false);
    }

    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String str, Collection<KeyValue<K, V>> collection, Properties properties, Long l, boolean z) {
        produceKeyValuesSynchronouslyWithTimestamp(str, collection, properties, null, l, z);
    }

    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String str, Collection<KeyValue<K, V>> collection, Properties properties, Headers headers, Long l, boolean z) {
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Throwable th = null;
        if (z) {
            try {
                try {
                    kafkaProducer.initTransactions();
                    kafkaProducer.beginTransaction();
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (kafkaProducer != null) {
                    if (th != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                throw th3;
            }
        }
        for (KeyValue<K, V> keyValue : collection) {
            kafkaProducer.send(new ProducerRecord(str, (Integer) null, l, keyValue.key, keyValue.value, headers));
        }
        if (z) {
            kafkaProducer.commitTransaction();
        }
        if (kafkaProducer != null) {
            if (0 == 0) {
                kafkaProducer.close();
                return;
            }
            try {
                kafkaProducer.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    public static <V, K> void produceSynchronously(Properties properties, boolean z, String str, Optional<Integer> optional, List<KeyValueTimestamp<K, V>> list) {
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Throwable th = null;
        if (z) {
            try {
                try {
                    kafkaProducer.initTransactions();
                    kafkaProducer.beginTransaction();
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (kafkaProducer != null) {
                    if (th != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                throw th3;
            }
        }
        LinkedList linkedList = new LinkedList();
        for (KeyValueTimestamp<K, V> keyValueTimestamp : list) {
            linkedList.add(kafkaProducer.send(new ProducerRecord(str, optional.orElse(null), Long.valueOf(keyValueTimestamp.timestamp()), keyValueTimestamp.key(), keyValueTimestamp.value(), (Iterable) null)));
        }
        if (z) {
            kafkaProducer.commitTransaction();
        } else {
            kafkaProducer.flush();
        }
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            try {
                ((Future) it.next()).get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
        if (kafkaProducer != null) {
            if (0 == 0) {
                kafkaProducer.close();
                return;
            }
            try {
                kafkaProducer.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    public static <K, V> void produceAbortedKeyValuesSynchronouslyWithTimestamp(String str, Collection<KeyValue<K, V>> collection, Properties properties, Long l) throws Exception {
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Throwable th = null;
        try {
            try {
                kafkaProducer.initTransactions();
                for (KeyValue<K, V> keyValue : collection) {
                    kafkaProducer.beginTransaction();
                    kafkaProducer.send(new ProducerRecord(str, (Integer) null, l, keyValue.key, keyValue.value)).get();
                    kafkaProducer.abortTransaction();
                }
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    public static <V> void produceValuesSynchronously(String str, Collection<V> collection, Properties properties, Time time) {
        produceValuesSynchronously(str, collection, properties, time, false);
    }

    public static <V> void produceValuesSynchronously(String str, Collection<V> collection, Properties properties, Time time, boolean z) {
        ArrayList arrayList = new ArrayList();
        Iterator<V> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(new KeyValue((Object) null, it.next()));
        }
        produceKeyValuesSynchronously(str, arrayList, properties, time, z);
    }

    public static void waitForCompletion(KafkaStreams kafkaStreams, int i, long j) {
        int i2;
        double d;
        long currentTimeMillis = System.currentTimeMillis();
        do {
            i2 = 0;
            d = 0.0d;
            for (Metric metric : kafkaStreams.metrics().values()) {
                if (metric.metricName().name().equals("records-lag") && !((String) metric.metricName().tags().get("client-id")).endsWith("restore-consumer")) {
                    i2++;
                    d += ((Number) metric.metricValue()).doubleValue();
                }
            }
            if (i2 >= i && d == 0.0d) {
                return;
            }
        } while (System.currentTimeMillis() - currentTimeMillis < j);
        throw new RuntimeException(String.format("Timed out waiting for completion. lagMetrics=[%s/%s] totalLag=[%s]", Integer.valueOf(i2), Integer.valueOf(i), Double.valueOf(d)));
    }

    public static void waitForStandbyCompletion(KafkaStreams kafkaStreams, int i, long j) {
        int i2;
        double d;
        long currentTimeMillis = System.currentTimeMillis();
        do {
            i2 = 0;
            d = 0.0d;
            for (Metric metric : kafkaStreams.metrics().values()) {
                if (metric.metricName().name().equals("records-lag") && ((String) metric.metricName().tags().get("client-id")).endsWith("restore-consumer")) {
                    i2++;
                    d += ((Number) metric.metricValue()).doubleValue();
                }
            }
            if (i2 >= i && d == 0.0d) {
                return;
            }
        } while (System.currentTimeMillis() - currentTimeMillis < j);
        throw new RuntimeException(String.format("Timed out waiting for completion. lagMetrics=[%s/%s] totalLag=[%s]", Integer.valueOf(i2), Integer.valueOf(i), Double.valueOf(d)));
    }

    public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(Properties properties, String str, int i) throws Exception {
        return waitUntilMinRecordsReceived(properties, str, i, DEFAULT_TIMEOUT);
    }

    public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(Properties properties, String str, int i, long j) throws Exception {
        ArrayList arrayList = new ArrayList();
        String format = String.format("Did not receive all %d records from topic %s within %d ms", Integer.valueOf(i), str, Long.valueOf(j));
        KafkaConsumer createConsumer = createConsumer(properties);
        Throwable th = null;
        try {
            try {
                TestUtils.retryOnExceptionWithTimeout(j, () -> {
                    arrayList.addAll(readRecords(str, createConsumer, j, i));
                    MatcherAssert.assertThat(format, Integer.valueOf(arrayList.size()), Matchers.is(Matchers.greaterThanOrEqualTo(Integer.valueOf(i))));
                });
                if (createConsumer != null) {
                    if (0 != 0) {
                        try {
                            createConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumer.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (createConsumer != null) {
                if (th != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th3;
        }
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties properties, String str, int i) throws Exception {
        return waitUntilMinKeyValueRecordsReceived(properties, str, i, DEFAULT_TIMEOUT);
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties properties, String str, int i, long j) throws Exception {
        ArrayList arrayList = new ArrayList();
        String format = String.format("Did not receive all %d records from topic %s within %d ms", Integer.valueOf(i), str, Long.valueOf(j));
        KafkaConsumer createConsumer = createConsumer(properties);
        Throwable th = null;
        try {
            try {
                TestUtils.retryOnExceptionWithTimeout(j, () -> {
                    arrayList.addAll(readKeyValues(str, createConsumer, j, i));
                    MatcherAssert.assertThat(format, Integer.valueOf(arrayList.size()), Matchers.is(Matchers.greaterThanOrEqualTo(Integer.valueOf(i))));
                });
                if (createConsumer != null) {
                    if (0 != 0) {
                        try {
                            createConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumer.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (createConsumer != null) {
                if (th != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th3;
        }
    }

    public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilMinKeyValueWithTimestampRecordsReceived(Properties properties, String str, int i, long j) throws Exception {
        ArrayList arrayList = new ArrayList();
        String format = String.format("Did not receive all %d records from topic %s within %d ms", Integer.valueOf(i), str, Long.valueOf(j));
        KafkaConsumer createConsumer = createConsumer(properties);
        Throwable th = null;
        try {
            try {
                TestUtils.retryOnExceptionWithTimeout(j, () -> {
                    arrayList.addAll(readKeyValuesWithTimestamp(str, createConsumer, j, i));
                    MatcherAssert.assertThat(format, Integer.valueOf(arrayList.size()), Matchers.is(Matchers.greaterThanOrEqualTo(Integer.valueOf(i))));
                });
                if (createConsumer != null) {
                    if (0 != 0) {
                        try {
                            createConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumer.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (createConsumer != null) {
                if (th != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th3;
        }
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(Properties properties, String str, List<KeyValue<K, V>> list) throws Exception {
        return waitUntilFinalKeyValueRecordsReceived(properties, str, list, DEFAULT_TIMEOUT);
    }

    public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilFinalKeyValueTimestampRecordsReceived(Properties properties, String str, List<KeyValueTimestamp<K, V>> list) throws Exception {
        return waitUntilFinalKeyValueRecordsReceived(properties, str, list, DEFAULT_TIMEOUT, true);
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(Properties properties, String str, List<KeyValue<K, V>> list, long j) throws Exception {
        return waitUntilFinalKeyValueRecordsReceived(properties, str, list, j, false);
    }

    private static <K, V, T> List<T> waitUntilFinalKeyValueRecordsReceived(Properties properties, String str, List<T> list, long j, boolean z) throws Exception {
        ArrayList arrayList = new ArrayList();
        KafkaConsumer createConsumer = createConsumer(properties);
        Throwable th = null;
        try {
            try {
                TestUtils.waitForCondition(() -> {
                    arrayList.addAll(z ? readKeyValuesWithTimestamp(str, createConsumer, j, list.size()) : readKeyValues(str, createConsumer, j, list.size()));
                    Stream stream = arrayList.stream();
                    list.getClass();
                    List list2 = (List) stream.filter(list::contains).collect(Collectors.toList());
                    HashMap hashMap = new HashMap();
                    for (Object obj : list2) {
                        ((List) hashMap.computeIfAbsent(z ? ((KeyValueTimestamp) obj).key() : ((KeyValue) obj).key, obj2 -> {
                            return new ArrayList();
                        })).add(obj);
                    }
                    HashMap hashMap2 = new HashMap();
                    for (Object obj3 : list) {
                        ((List) hashMap2.computeIfAbsent(z ? ((KeyValueTimestamp) obj3).key() : ((KeyValue) obj3).key, obj4 -> {
                            return new ArrayList();
                        })).add(obj3);
                    }
                    return hashMap.equals(hashMap2);
                }, j, "Did not receive all " + list + " records from topic " + str);
                if (createConsumer != null) {
                    if (0 != 0) {
                        try {
                            createConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumer.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (createConsumer != null) {
                if (th != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th3;
        }
    }

    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties properties, String str, int i) throws Exception {
        return waitUntilMinValuesRecordsReceived(properties, str, i, DEFAULT_TIMEOUT);
    }

    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties properties, String str, int i, long j) throws Exception {
        ArrayList arrayList = new ArrayList();
        String format = String.format("Did not receive all %d records from topic %s within %d ms", Integer.valueOf(i), str, Long.valueOf(j));
        KafkaConsumer createConsumer = createConsumer(properties);
        Throwable th = null;
        try {
            try {
                TestUtils.retryOnExceptionWithTimeout(j, () -> {
                    arrayList.addAll(readValues(str, createConsumer, j, i));
                    MatcherAssert.assertThat(format, Integer.valueOf(arrayList.size()), Matchers.is(Matchers.greaterThanOrEqualTo(Integer.valueOf(i))));
                });
                if (createConsumer != null) {
                    if (0 != 0) {
                        try {
                            createConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumer.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (createConsumer != null) {
                if (th != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th3;
        }
    }

    public static void waitForTopicPartitions(List<KafkaServer> list, List<TopicPartition> list2, long j) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + j;
        for (TopicPartition topicPartition : list2) {
            long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
            if (currentTimeMillis2 <= 0) {
                throw new AssertionError("timed out while waiting for partitions to become available. Timeout=" + j);
            }
            waitUntilMetadataIsPropagated(list, topicPartition.topic(), topicPartition.partition(), currentTimeMillis2);
        }
    }

    private static void waitUntilMetadataIsPropagated(List<KafkaServer> list, String str, int i, long j) throws InterruptedException {
        String format = String.format("Metadata for topic=%s partition=%d was not propagated to all brokers within %d ms. ", str, Integer.valueOf(i), Long.valueOf(j));
        TestUtils.retryOnExceptionWithTimeout(j, () -> {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                KafkaServer kafkaServer = (KafkaServer) it.next();
                Option partitionInfo = kafkaServer.dataPlaneRequestProcessor().metadataCache().getPartitionInfo(str, i);
                if (partitionInfo.isEmpty()) {
                    arrayList.add(kafkaServer);
                } else if (!Request.isValidBrokerId(((UpdateMetadataRequestData.UpdateMetadataPartitionState) partitionInfo.get()).leader())) {
                    arrayList2.add(kafkaServer);
                }
            }
            MatcherAssert.assertThat(format + ". Brokers without partition info: " + arrayList + ". Brokers with invalid broker id for partition leader: " + arrayList2, arrayList.isEmpty() && arrayList2.isEmpty());
        });
    }

    public static void startApplicationAndWaitUntilRunning(List<KafkaStreams> list, Duration duration) throws Exception {
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition newCondition = reentrantLock.newCondition();
        HashMap hashMap = new HashMap();
        for (KafkaStreams kafkaStreams : list) {
            hashMap.put(kafkaStreams, kafkaStreams.state());
            KafkaStreams.StateListener stateListener = getStateListener(kafkaStreams);
            KafkaStreams.StateListener stateListener2 = (state, state2) -> {
                reentrantLock.lock();
                try {
                    hashMap.put(kafkaStreams, state);
                    if (state == KafkaStreams.State.RUNNING && hashMap.values().stream().allMatch(state -> {
                        return state == KafkaStreams.State.RUNNING;
                    })) {
                        newCondition.signalAll();
                    }
                } finally {
                    reentrantLock.unlock();
                }
            };
            kafkaStreams.setStateListener(stateListener != null ? new CompositeStateListener(stateListener, stateListener2) : stateListener2);
        }
        Iterator<KafkaStreams> it = list.iterator();
        while (it.hasNext()) {
            it.next().start();
        }
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        reentrantLock.lock();
        while (true) {
            try {
                HashMap hashMap2 = new HashMap();
                for (Map.Entry entry : hashMap.entrySet()) {
                    if (entry.getValue() != KafkaStreams.State.RUNNING) {
                        hashMap2.put(entry.getKey(), entry.getValue());
                    }
                }
                if (hashMap2.isEmpty()) {
                    return;
                }
                long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                if (currentTimeMillis2 <= 0) {
                    Assert.fail("Application did not reach a RUNNING state for all streams instances. Non-running instances: " + hashMap2);
                }
                newCondition.await(currentTimeMillis2, TimeUnit.MILLISECONDS);
            } finally {
                reentrantLock.unlock();
            }
        }
    }

    public static void waitForApplicationState(List<KafkaStreams> list, KafkaStreams.State state, Duration duration) throws Exception {
        TestUtils.retryOnExceptionWithTimeout(duration.toMillis(), () -> {
            Map map = (Map) ((Map) list.stream().collect(Collectors.toMap(kafkaStreams -> {
                return kafkaStreams;
            }, (v0) -> {
                return v0.state();
            }))).entrySet().stream().filter(entry -> {
                return entry.getValue() != state;
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
            MatcherAssert.assertThat(String.format("Expected all streams instances in %s to be %s within %d ms, but the following were not: %s", list, state, Long.valueOf(duration.toMillis()), map), map.isEmpty());
        });
    }

    public static void waitForEmptyConsumerGroup(Admin admin, String str, long j) throws Exception {
        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(admin, str), j, "Test consumer group " + str + " still active even after waiting " + j + " ms.");
    }

    public static boolean isEmptyConsumerGroup(Admin admin, String str) {
        try {
            return ((ConsumerGroupDescription) ((KafkaFuture) admin.describeConsumerGroups(Collections.singletonList(str)).describedGroups().get(str)).get()).members().isEmpty();
        } catch (InterruptedException | ExecutionException e) {
            return false;
        }
    }

    private static KafkaStreams.StateListener getStateListener(KafkaStreams kafkaStreams) {
        try {
            Field declaredField = kafkaStreams.getClass().getDeclaredField("stateListener");
            declaredField.setAccessible(true);
            return (KafkaStreams.StateListener) declaredField.get(kafkaStreams);
        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Failed to get StateListener through reflection", e);
        }
    }

    public static <K, V> void verifyKeyValueTimestamps(Properties properties, String str, List<KeyValueTimestamp<K, V>> list) {
        try {
            List<ConsumerRecord> waitUntilMinRecordsReceived = waitUntilMinRecordsReceived(properties, str, list.size());
            if (waitUntilMinRecordsReceived.size() != list.size()) {
                throw new AssertionError(printRecords(waitUntilMinRecordsReceived) + " != " + list);
            }
            Iterator<KeyValueTimestamp<K, V>> it = list.iterator();
            for (ConsumerRecord consumerRecord : waitUntilMinRecordsReceived) {
                KeyValueTimestamp<K, V> next = it.next();
                try {
                    compareKeyValueTimestamp(consumerRecord, next.key(), next.value(), next.timestamp());
                } catch (AssertionError e) {
                    throw new AssertionError(printRecords(waitUntilMinRecordsReceived) + " != " + list, e);
                }
            }
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    public static void verifyKeyValueTimestamps(Properties properties, String str, Set<KeyValueTimestamp<String, Long>> set) {
        try {
            List waitUntilMinRecordsReceived = waitUntilMinRecordsReceived(properties, str, set.size());
            if (waitUntilMinRecordsReceived.size() != set.size()) {
                throw new AssertionError(printRecords(waitUntilMinRecordsReceived) + " != " + set);
            }
            MatcherAssert.assertThat((Set) waitUntilMinRecordsReceived.stream().map(consumerRecord -> {
                return new KeyValueTimestamp(consumerRecord.key(), consumerRecord.value(), consumerRecord.timestamp());
            }).collect(Collectors.toSet()), Matchers.equalTo(set));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static <K, V> void compareKeyValueTimestamp(ConsumerRecord<K, V> consumerRecord, K k, V v, long j) {
        Objects.requireNonNull(consumerRecord);
        Object key = consumerRecord.key();
        Object value = consumerRecord.value();
        long timestamp = consumerRecord.timestamp();
        AssertionError assertionError = new AssertionError("Expected <" + k + ", " + v + "> with timestamp=" + j + " but was <" + key + ", " + value + "> with timestamp=" + timestamp);
        if (key != null) {
            if (!key.equals(k)) {
                throw assertionError;
            }
        } else if (k != null) {
            throw assertionError;
        }
        if (value != null) {
            if (!value.equals(v)) {
                throw assertionError;
            }
        } else if (v != null) {
            throw assertionError;
        }
        if (timestamp != j) {
            throw assertionError;
        }
    }

    private static <K, V> String printRecords(List<ConsumerRecord<K, V>> list) {
        StringBuilder sb = new StringBuilder();
        sb.append("[\n");
        Iterator<ConsumerRecord<K, V>> it = list.iterator();
        while (it.hasNext()) {
            sb.append("  ").append(it.next().toString()).append("\n");
        }
        sb.append("]");
        return sb.toString();
    }

    private static <V> List<V> readValues(String str, Consumer<Object, V> consumer, long j, int i) {
        ArrayList arrayList = new ArrayList();
        Iterator it = readKeyValues(str, consumer, j, i).iterator();
        while (it.hasNext()) {
            arrayList.add(((KeyValue) it.next()).value);
        }
        return arrayList;
    }

    private static <K, V> List<KeyValue<K, V>> readKeyValues(String str, Consumer<K, V> consumer, long j, int i) {
        ArrayList arrayList = new ArrayList();
        for (ConsumerRecord consumerRecord : readRecords(str, consumer, j, i)) {
            arrayList.add(new KeyValue(consumerRecord.key(), consumerRecord.value()));
        }
        return arrayList;
    }

    private static <K, V> List<KeyValueTimestamp<K, V>> readKeyValuesWithTimestamp(String str, Consumer<K, V> consumer, long j, int i) {
        ArrayList arrayList = new ArrayList();
        for (ConsumerRecord consumerRecord : readRecords(str, consumer, j, i)) {
            arrayList.add(new KeyValueTimestamp(consumerRecord.key(), consumerRecord.value(), consumerRecord.timestamp()));
        }
        return arrayList;
    }

    private static <K, V> List<ConsumerRecord<K, V>> readRecords(String str, Consumer<K, V> consumer, long j, int i) {
        consumer.subscribe(Collections.singletonList(str));
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        while (i2 < j && continueConsuming(arrayList.size(), i)) {
            i2 += 100;
            Iterator it = consumer.poll(Duration.ofMillis(100L)).iterator();
            while (it.hasNext()) {
                arrayList.add((ConsumerRecord) it.next());
            }
        }
        return arrayList;
    }

    private static boolean continueConsuming(int i, int i2) {
        return i2 <= 0 || i < i2;
    }

    private static <K, V> KafkaConsumer<K, V> createConsumer(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.setProperty("auto.offset.reset", "earliest");
        properties2.setProperty("enable.auto.commit", "true");
        return new KafkaConsumer<>(properties2);
    }

    public static KafkaStreams getStartedStreams(Properties properties, StreamsBuilder streamsBuilder, boolean z) {
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        if (z) {
            kafkaStreams.cleanUp();
        }
        kafkaStreams.start();
        return kafkaStreams;
    }

    public static KafkaStreams getRunningStreams(Properties properties, StreamsBuilder streamsBuilder, boolean z) {
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        if (z) {
            kafkaStreams.cleanUp();
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        kafkaStreams.setStateListener((state, state2) -> {
            if (state == KafkaStreams.State.RUNNING) {
                countDownLatch.countDown();
            }
        });
        kafkaStreams.start();
        try {
            countDownLatch.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
            return kafkaStreams;
        } catch (InterruptedException e) {
            throw new RuntimeException("Streams didn't start in time.", e);
        }
    }

    public static <S> S getStore(String str, KafkaStreams kafkaStreams, QueryableStoreType<S> queryableStoreType) throws Exception {
        return (S) getStore(DEFAULT_TIMEOUT, str, kafkaStreams, queryableStoreType);
    }

    public static <S> S getStore(String str, KafkaStreams kafkaStreams, boolean z, QueryableStoreType<S> queryableStoreType) throws Exception {
        return (S) getStore(DEFAULT_TIMEOUT, str, kafkaStreams, z, queryableStoreType);
    }

    public static <S> S getStore(long j, String str, KafkaStreams kafkaStreams, QueryableStoreType<S> queryableStoreType) throws Exception {
        return (S) getStore(j, str, kafkaStreams, false, queryableStoreType);
    }

    public static <S> S getStore(long j, String str, KafkaStreams kafkaStreams, boolean z, QueryableStoreType<S> queryableStoreType) throws Exception {
        return (S) getStore(j, kafkaStreams, z ? StoreQueryParameters.fromNameAndType(str, queryableStoreType).enableStaleStores() : StoreQueryParameters.fromNameAndType(str, queryableStoreType));
    }

    public static <S> S getStore(KafkaStreams kafkaStreams, StoreQueryParameters<S> storeQueryParameters) throws Exception {
        return (S) getStore(DEFAULT_TIMEOUT, kafkaStreams, storeQueryParameters);
    }

    public static <S> S getStore(long j, KafkaStreams kafkaStreams, StoreQueryParameters<S> storeQueryParameters) throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (true) {
            try {
                return (S) kafkaStreams.store(storeQueryParameters);
            } catch (Exception e) {
                if (System.currentTimeMillis() > currentTimeMillis) {
                    throw new AssertionError(e);
                }
            } catch (InvalidStateStoreException e2) {
                if (System.currentTimeMillis() > currentTimeMillis) {
                    throw e2;
                }
            }
            Thread.sleep(Math.min(100L, j));
        }
    }
}
