package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManager.class */
public class InternalTopicManager {
    private static final String BUG_ERROR_MESSAGE = "This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
    private static final String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
    private final Time time;
    private final Admin adminClient;
    private final short replicationFactor;
    private final long windowChangeLogAdditionalRetention;
    private final long retryBackOffMs;
    private final long retryTimeoutMs;
    private final Map<String, String> defaultTopicConfigs = new HashMap();
    private final Logger log = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())).logger(getClass());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManager$ValidationResult.class */
    public static class ValidationResult {
        private final Set<String> missingTopics = new HashSet();
        private final Map<String, List<String>> misconfigurationsForTopics = new HashMap();

        ValidationResult() {
        }

        public void addMissingTopic(String str) {
            this.missingTopics.add(str);
        }

        public Set<String> missingTopics() {
            return Collections.unmodifiableSet(this.missingTopics);
        }

        public void addMisconfiguration(String str, String str2) {
            this.misconfigurationsForTopics.computeIfAbsent(str, str3 -> {
                return new ArrayList();
            }).add(str2);
        }

        public Map<String, List<String>> misconfigurationsForTopics() {
            return Collections.unmodifiableMap(this.misconfigurationsForTopics);
        }
    }

    public InternalTopicManager(Time time, Admin admin, StreamsConfig streamsConfig) {
        this.time = time;
        this.adminClient = admin;
        this.replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
        this.windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG).longValue();
        this.retryBackOffMs = streamsConfig.getLong("retry.backoff.ms").longValue();
        Map<String, Object> mainConsumerConfigs = streamsConfig.getMainConsumerConfigs("dummy", "dummy", -1);
        mainConsumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        mainConsumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        this.retryTimeoutMs = new ClientUtils.QuietConsumerConfig(mainConsumerConfigs).getInt("max.poll.interval.ms").intValue() / 2;
        this.log.debug("Configs:" + Utils.NL + "\t{} = {}" + Utils.NL + "\t{} = {}", new Object[]{StreamsConfig.REPLICATION_FACTOR_CONFIG, Short.valueOf(this.replicationFactor), StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, Long.valueOf(this.windowChangeLogAdditionalRetention)});
        for (Map.Entry<String, Object> entry : streamsConfig.originalsWithPrefix(StreamsConfig.TOPIC_PREFIX).entrySet()) {
            if (entry.getValue() != null) {
                this.defaultTopicConfigs.put(entry.getKey(), entry.getValue().toString());
            }
        }
    }

    public ValidationResult validate(Map<String, InternalTopicConfig> map) {
        this.log.info("Starting to validate internal topics {}.", map.keySet());
        long milliseconds = this.time.milliseconds() + this.retryTimeoutMs;
        ValidationResult validationResult = new ValidationResult();
        HashSet hashSet = new HashSet(map.keySet());
        HashSet hashSet2 = new HashSet(map.keySet());
        while (true) {
            if (hashSet.isEmpty() && hashSet2.isEmpty()) {
                this.log.info("Completed validation of internal topics {}.", map.keySet());
                return validationResult;
            }
            Map<String, KafkaFuture<TopicDescription>> emptyMap = Collections.emptyMap();
            if (!hashSet.isEmpty()) {
                emptyMap = this.adminClient.describeTopics(hashSet).topicNameValues();
            }
            Map emptyMap2 = Collections.emptyMap();
            if (!hashSet2.isEmpty()) {
                emptyMap2 = (Map) this.adminClient.describeConfigs((Collection) hashSet2.stream().map(str -> {
                    return new ConfigResource(ConfigResource.Type.TOPIC, str);
                }).collect(Collectors.toSet())).values().entrySet().stream().collect(Collectors.toMap(entry -> {
                    return ((ConfigResource) entry.getKey()).name();
                }, (v0) -> {
                    return v0.getValue();
                }));
            }
            while (true) {
                if (!emptyMap.isEmpty() || !emptyMap2.isEmpty()) {
                    if (!emptyMap.isEmpty()) {
                        doValidateTopic(validationResult, emptyMap, map, hashSet, (internalTopicConfig, topicDescription) -> {
                            validatePartitionCount(validationResult, internalTopicConfig, topicDescription);
                        });
                    }
                    if (!emptyMap2.isEmpty()) {
                        doValidateTopic(validationResult, emptyMap2, map, hashSet2, (internalTopicConfig2, config) -> {
                            validateCleanupPolicy(validationResult, internalTopicConfig2, config);
                        });
                    }
                    maybeThrowTimeoutException(Arrays.asList(hashSet, hashSet2), milliseconds, String.format("Could not validate internal topics within %d milliseconds. This can happen if the Kafka cluster is temporarily not available.", Long.valueOf(this.retryTimeoutMs)));
                    if (!emptyMap.isEmpty() || !emptyMap2.isEmpty()) {
                        Utils.sleep(100L);
                    }
                }
            }
            maybeSleep(Arrays.asList(hashSet, hashSet2), milliseconds, "validated");
        }
    }

    private <V> void doValidateTopic(ValidationResult validationResult, Map<String, KafkaFuture<V>> map, Map<String, InternalTopicConfig> map2, Set<String> set, BiConsumer<InternalTopicConfig, V> biConsumer) {
        Iterator it = new HashSet(set).iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            if (!map.containsKey(str)) {
                throw new IllegalStateException("Description results do not contain topics to validate. This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).");
            }
            KafkaFuture<V> kafkaFuture = map.get(str);
            if (kafkaFuture.isDone()) {
                try {
                    try {
                        try {
                            biConsumer.accept(map2.get(str), kafkaFuture.get());
                            set.remove(str);
                            map.remove(str);
                        } catch (InterruptedException e) {
                            throw new InterruptException(e);
                        }
                    } catch (ExecutionException e2) {
                        Throwable cause = e2.getCause();
                        if (cause instanceof UnknownTopicOrPartitionException) {
                            this.log.info("Internal topic {} is missing", str);
                            validationResult.addMissingTopic(str);
                            set.remove(str);
                        } else if (cause instanceof LeaderNotAvailableException) {
                            this.log.info("The leader of internal topic {} is not available.", str);
                        } else {
                            if (!(cause instanceof TimeoutException)) {
                                this.log.error("Unexpected error during internal topic validation: ", cause);
                                throw new StreamsException(String.format("Could not validate internal topic %s for the following reason: ", str), cause);
                            }
                            this.log.info("Retrieving data for internal topic {} timed out.", str);
                        }
                        map.remove(str);
                    }
                } catch (Throwable th) {
                    map.remove(str);
                    throw th;
                }
            }
        }
    }

    private void validatePartitionCount(ValidationResult validationResult, InternalTopicConfig internalTopicConfig, TopicDescription topicDescription) {
        String name = internalTopicConfig.name();
        int intValue = internalTopicConfig.numberOfPartitions().orElseThrow(() -> {
            return new IllegalStateException("No partition count is specified for internal topic " + name + ". " + BUG_ERROR_MESSAGE);
        }).intValue();
        int size = topicDescription.partitions().size();
        if (size != intValue) {
            validationResult.addMisconfiguration(name, "Internal topic " + name + " requires " + intValue + " partitions, but the existing topic on the broker has " + size + " partitions.");
        }
    }

    private void validateCleanupPolicy(ValidationResult validationResult, InternalTopicConfig internalTopicConfig, Config config) {
        if (internalTopicConfig instanceof UnwindowedChangelogTopicConfig) {
            validateCleanupPolicyForUnwindowedChangelogs(validationResult, internalTopicConfig, config);
        } else if (internalTopicConfig instanceof WindowedChangelogTopicConfig) {
            validateCleanupPolicyForWindowedChangelogs(validationResult, internalTopicConfig, config);
        } else {
            if (!(internalTopicConfig instanceof RepartitionTopicConfig)) {
                throw new IllegalStateException("Internal topic " + internalTopicConfig.name() + " has unknown type.");
            }
            validateCleanupPolicyForRepartitionTopic(validationResult, internalTopicConfig, config);
        }
    }

    private void validateCleanupPolicyForUnwindowedChangelogs(ValidationResult validationResult, InternalTopicConfig internalTopicConfig, Config config) {
        String name = internalTopicConfig.name();
        if (getBrokerSideConfigValue(config, TopicConfig.CLEANUP_POLICY_CONFIG, name).contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
            validationResult.addMisconfiguration(name, "Cleanup policy (cleanup.policy) of existing internal topic " + name + " should not contain \"" + TopicConfig.CLEANUP_POLICY_DELETE + "\".");
        }
    }

    private void validateCleanupPolicyForWindowedChangelogs(ValidationResult validationResult, InternalTopicConfig internalTopicConfig, Config config) {
        String name = internalTopicConfig.name();
        if (getBrokerSideConfigValue(config, TopicConfig.CLEANUP_POLICY_CONFIG, name).contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
            long parseLong = Long.parseLong(getBrokerSideConfigValue(config, TopicConfig.RETENTION_MS_CONFIG, name));
            long parseLong2 = Long.parseLong(internalTopicConfig.getProperties(this.defaultTopicConfigs, this.windowChangeLogAdditionalRetention).get(TopicConfig.RETENTION_MS_CONFIG));
            if (parseLong < parseLong2) {
                validationResult.addMisconfiguration(name, "Retention time (retention.ms) of existing internal topic " + name + " is " + parseLong + " but should be " + parseLong2 + " or larger.");
            }
            if (getBrokerSideConfigValue(config, TopicConfig.RETENTION_BYTES_CONFIG, name) != null) {
                validationResult.addMisconfiguration(name, "Retention byte (retention.bytes) of existing internal topic " + name + " is set but it should be unset.");
            }
        }
    }

    private void validateCleanupPolicyForRepartitionTopic(ValidationResult validationResult, InternalTopicConfig internalTopicConfig, Config config) {
        String name = internalTopicConfig.name();
        String brokerSideConfigValue = getBrokerSideConfigValue(config, TopicConfig.CLEANUP_POLICY_CONFIG, name);
        if (brokerSideConfigValue.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
            validationResult.addMisconfiguration(name, "Cleanup policy (cleanup.policy) of existing internal topic " + name + " should not contain \"" + TopicConfig.CLEANUP_POLICY_COMPACT + "\".");
            return;
        }
        if (brokerSideConfigValue.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
            long parseLong = Long.parseLong(getBrokerSideConfigValue(config, TopicConfig.RETENTION_MS_CONFIG, name));
            if (parseLong != -1) {
                validationResult.addMisconfiguration(name, "Retention time (retention.ms) of existing internal topic " + name + " is " + parseLong + " but should be -1.");
            }
            if (getBrokerSideConfigValue(config, TopicConfig.RETENTION_BYTES_CONFIG, name) != null) {
                validationResult.addMisconfiguration(name, "Retention byte (retention.bytes) of existing internal topic " + name + " is set but it should be unset.");
            }
        }
    }

    private String getBrokerSideConfigValue(Config config, String str, String str2) {
        ConfigEntry configEntry = config.get(str);
        if (configEntry == null) {
            throw new IllegalStateException("The config " + str + " for topic " + str2 + " could not be retrieved from the brokers. " + BUG_ERROR_MESSAGE);
        }
        return configEntry.value();
    }

    public Set<String> makeReady(Map<String, InternalTopicConfig> map) {
        this.log.debug("Starting to validate internal topics {} in partition assignor.", map);
        long milliseconds = this.time.milliseconds() + this.retryTimeoutMs;
        Set<String> hashSet = new HashSet(map.keySet());
        HashSet hashSet2 = new HashSet();
        while (!hashSet.isEmpty()) {
            HashSet hashSet3 = new HashSet();
            hashSet = validateTopics(hashSet, map, hashSet3);
            hashSet2.addAll(hashSet);
            if (!hashSet.isEmpty()) {
                HashSet hashSet4 = new HashSet();
                for (String str : hashSet) {
                    if (!hashSet3.contains(str)) {
                        InternalTopicConfig internalTopicConfig = (InternalTopicConfig) Objects.requireNonNull(map.get(str));
                        Map<String, String> properties = internalTopicConfig.getProperties(this.defaultTopicConfigs, this.windowChangeLogAdditionalRetention);
                        this.log.debug("Going to create topic {} with {} partitions and config {}.", new Object[]{internalTopicConfig.name(), internalTopicConfig.numberOfPartitions(), properties});
                        hashSet4.add(new NewTopic(internalTopicConfig.name(), internalTopicConfig.numberOfPartitions(), (Optional<Short>) Optional.of(Short.valueOf(this.replicationFactor))).configs(properties));
                    }
                }
                if (!hashSet4.isEmpty()) {
                    for (Map.Entry<String, KafkaFuture<Void>> entry : this.adminClient.createTopics(hashSet4).values().entrySet()) {
                        String key = entry.getKey();
                        try {
                            entry.getValue().get();
                            hashSet.remove(key);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            this.log.error(INTERRUPTED_ERROR_MESSAGE, e);
                            throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, e);
                        } catch (ExecutionException e2) {
                            Throwable cause = e2.getCause();
                            if (cause instanceof TopicExistsException) {
                                this.log.info("Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\nWill retry to create this topic in {} ms (to let broker finish async delete operation first).\nError message was: {}", new Object[]{key, Long.valueOf(this.retryBackOffMs), cause.toString()});
                            } else {
                                this.log.error("Unexpected error during topic creation for {}.\nError message was: {}", key, cause.toString());
                                if (!(cause instanceof UnsupportedVersionException)) {
                                    throw new StreamsException(String.format("Could not create topic %s.", key), cause);
                                }
                                String message = cause.getMessage();
                                if (message == null) {
                                    continue;
                                } else if (message.startsWith("Creating topics with default partitions/replication factor are only supported in CreateTopicRequest version 4+")) {
                                    throw new StreamsException(String.format("Could not create topic %s, because brokers don't support configuration replication.factor=-1. You can change the replication.factor config or upgrade your brokers to version 2.4 or newer to avoid this error.", key));
                                }
                            }
                        } catch (TimeoutException e3) {
                            this.log.error("Creating topic {} timed out.\nError message was: {}", key, e3.toString());
                        }
                    }
                }
            }
            if (!hashSet.isEmpty()) {
                long milliseconds2 = this.time.milliseconds();
                if (milliseconds2 >= milliseconds) {
                    String format = String.format("Could not create topics within %d milliseconds. This can happen if the Kafka cluster is temporarily not available.", Long.valueOf(this.retryTimeoutMs));
                    this.log.error(format);
                    throw new TimeoutException(format);
                }
                this.log.info("Topics {} could not be made ready. Will retry in {} milliseconds. Remaining time in milliseconds: {}", new Object[]{hashSet, Long.valueOf(this.retryBackOffMs), Long.valueOf(milliseconds - milliseconds2)});
                Utils.sleep(this.retryBackOffMs);
            }
        }
        this.log.debug("Completed validating internal topics and created {}", hashSet2);
        return hashSet2;
    }

    protected Map<String, Integer> getNumPartitions(Set<String> set, Set<String> set2) {
        this.log.debug("Trying to check if topics {} have been created with expected number of partitions.", set);
        Map<String, KafkaFuture<TopicDescription>> map = this.adminClient.describeTopics(set).topicNameValues();
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : map.entrySet()) {
            String key = entry.getKey();
            try {
                hashMap.put(key, Integer.valueOf(entry.getValue().get().partitions().size()));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.log.error(INTERRUPTED_ERROR_MESSAGE, e);
                throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, e);
            } catch (ExecutionException e2) {
                Throwable cause = e2.getCause();
                if (cause instanceof UnknownTopicOrPartitionException) {
                    this.log.debug("Topic {} is unknown or not found, hence not existed yet.\nError message was: {}", key, cause.toString());
                } else {
                    if (!(cause instanceof LeaderNotAvailableException)) {
                        this.log.error("Unexpected error during topic description for {}.\nError message was: {}", key, cause.toString());
                        throw new StreamsException(String.format("Could not create topic %s.", key), cause);
                    }
                    set2.add(key);
                    this.log.debug("The leader of topic {} is not available.\nError message was: {}", key, cause.toString());
                }
            } catch (TimeoutException e3) {
                set2.add(key);
                this.log.debug("Describing topic {} (to get number of partitions) timed out.\nError message was: {}", key, e3.toString());
            }
        }
        return hashMap;
    }

    private Set<String> validateTopics(Set<String> set, Map<String, InternalTopicConfig> map, Set<String> set2) {
        if (!map.keySet().containsAll(set)) {
            throw new IllegalStateException("The topics map " + map.keySet() + " does not contain all the topics " + set + " trying to validate.");
        }
        Map<String, Integer> numPartitions = getNumPartitions(set, set2);
        HashSet hashSet = new HashSet();
        for (String str : set) {
            Optional<Integer> numberOfPartitions = map.get(str).numberOfPartitions();
            if (!numberOfPartitions.isPresent()) {
                this.log.error("Found undefined number of partitions for topic {}", str);
                throw new StreamsException("Topic " + str + " number of partitions not defined");
            }
            if (!numPartitions.containsKey(str)) {
                hashSet.add(str);
            } else if (!numPartitions.get(str).equals(numberOfPartitions.get())) {
                String format = String.format("Existing internal topic %s has invalid partitions: expected: %d; actual: %d. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.", str, numberOfPartitions.get(), numPartitions.get(str));
                this.log.error(format);
                throw new StreamsException(format);
            }
        }
        return hashSet;
    }

    public void setup(Map<String, InternalTopicConfig> map) {
        this.log.info("Starting to setup internal topics {}.", map.keySet());
        long milliseconds = this.time.milliseconds() + this.retryTimeoutMs;
        Map map2 = (Map) map.values().stream().collect(Collectors.toMap((v0) -> {
            return v0.name();
        }, internalTopicConfig -> {
            return internalTopicConfig.getProperties(this.defaultTopicConfigs, this.windowChangeLogAdditionalRetention);
        }));
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet(map.keySet());
        while (!hashSet2.isEmpty()) {
            Set set = (Set) hashSet2.stream().map(str -> {
                return new NewTopic(str, ((InternalTopicConfig) map.get(str)).numberOfPartitions(), (Optional<Short>) Optional.of(Short.valueOf(this.replicationFactor))).configs((Map) map2.get(str));
            }).collect(Collectors.toSet());
            this.log.info("Going to create internal topics: " + set);
            processCreateTopicResults(this.adminClient.createTopics(set), hashSet2, hashSet, milliseconds);
            maybeSleep(Collections.singletonList(hashSet2), milliseconds, "created");
        }
        this.log.info("Completed setup of internal topics {}.", map.keySet());
    }

    private void processCreateTopicResults(CreateTopicsResult createTopicsResult, Set<String> set, Set<String> set2, long j) {
        HashMap hashMap = new HashMap();
        Map<String, KafkaFuture<Void>> values = createTopicsResult.values();
        while (!values.isEmpty()) {
            Iterator it = new HashSet(set).iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                if (!values.containsKey(str)) {
                    cleanUpCreatedTopics(set2);
                    throw new IllegalStateException("Create topic results do not contain internal topic " + str + " to setup. " + BUG_ERROR_MESSAGE);
                }
                KafkaFuture<Void> kafkaFuture = values.get(str);
                if (kafkaFuture.isDone()) {
                    try {
                        try {
                            try {
                                kafkaFuture.get();
                                set2.add(str);
                                set.remove(str);
                                values.remove(str);
                            } catch (ExecutionException e) {
                                Throwable cause = e.getCause();
                                if (cause instanceof TopicExistsException) {
                                    hashMap.put(str, cause);
                                    this.log.info("Internal topic {} already exists. Topic is probably marked for deletion. Will retry to create this topic later (to let broker complete async delete operation first)", str);
                                } else {
                                    if (!(cause instanceof TimeoutException)) {
                                        cleanUpCreatedTopics(set2);
                                        this.log.error("Unexpected error during creation of internal topic: ", cause);
                                        throw new StreamsException(String.format("Could not create internal topic %s for the following reason: ", str), cause);
                                    }
                                    hashMap.put(str, cause);
                                    this.log.info("Creating internal topic {} timed out.", str);
                                }
                                values.remove(str);
                            }
                        } catch (InterruptedException e2) {
                            throw new InterruptException(e2);
                        }
                    } catch (Throwable th) {
                        values.remove(str);
                        throw th;
                    }
                }
            }
            maybeThrowTimeoutExceptionDuringSetup(set, set2, hashMap, j);
            if (!values.isEmpty()) {
                Utils.sleep(100L);
            }
        }
    }

    private void cleanUpCreatedTopics(Set<String> set) {
        this.log.info("Starting to clean up internal topics {}.", set);
        long milliseconds = this.time.milliseconds() + this.retryTimeoutMs;
        HashSet hashSet = new HashSet(set);
        while (!hashSet.isEmpty()) {
            this.log.info("Going to cleanup internal topics: " + hashSet);
            Map<String, KafkaFuture<Void>> map = this.adminClient.deleteTopics(hashSet).topicNameValues();
            while (!map.isEmpty()) {
                Iterator it = new HashSet(hashSet).iterator();
                while (it.hasNext()) {
                    String str = (String) it.next();
                    if (!map.containsKey(str)) {
                        throw new IllegalStateException("Delete topic results do not contain internal topic " + str + " to clean up. " + BUG_ERROR_MESSAGE);
                    }
                    KafkaFuture<Void> kafkaFuture = map.get(str);
                    if (kafkaFuture.isDone()) {
                        try {
                            try {
                                try {
                                    kafkaFuture.get();
                                    hashSet.remove(str);
                                    map.remove(str);
                                } catch (ExecutionException e) {
                                    Throwable cause = e.getCause();
                                    if (cause instanceof UnknownTopicOrPartitionException) {
                                        this.log.info("Internal topic {} to clean up is missing", str);
                                    } else if (cause instanceof LeaderNotAvailableException) {
                                        this.log.info("The leader of internal topic {} to clean up is not available.", str);
                                    } else {
                                        if (!(cause instanceof TimeoutException)) {
                                            this.log.error("Unexpected error during cleanup of internal topics: ", cause);
                                            throw new StreamsException(String.format("Could not clean up internal topics %s, because during the cleanup of topic %s the following error occurred: ", hashSet, str), cause);
                                        }
                                        this.log.info("Cleaning up internal topic {} timed out.", str);
                                    }
                                    map.remove(str);
                                }
                            } catch (InterruptedException e2) {
                                throw new InterruptException(e2);
                            }
                        } catch (Throwable th) {
                            map.remove(str);
                            throw th;
                        }
                    }
                }
                maybeThrowTimeoutException(Collections.singletonList(hashSet), milliseconds, String.format("Could not cleanup internal topics within %d milliseconds. This can happen if the Kafka cluster is temporarily not available or the broker did not complete topic creation before the cleanup. The following internal topics could not be cleaned up: %s", Long.valueOf(this.retryTimeoutMs), hashSet));
                if (!map.isEmpty()) {
                    Utils.sleep(100L);
                }
            }
            maybeSleep(Collections.singletonList(hashSet), milliseconds, "validated");
        }
        this.log.info("Completed cleanup of internal topics {}.", set);
    }

    private void maybeThrowTimeoutException(List<Set<String>> list, long j, String str) {
        if (!list.stream().anyMatch(set -> {
            return !set.isEmpty();
        }) || this.time.milliseconds() < j) {
            return;
        }
        this.log.error(str);
        throw new TimeoutException(str);
    }

    private void maybeThrowTimeoutExceptionDuringSetup(Set<String> set, Set<String> set2, Map<String, Throwable> map, long j) {
        if (!set.stream().anyMatch(str -> {
            return !str.isEmpty();
        }) || this.time.milliseconds() < j) {
            return;
        }
        cleanUpCreatedTopics(set2);
        String format = String.format("Could not create internal topics within %d milliseconds. This can happen if the Kafka cluster is temporarily not available or a topic is marked for deletion and the broker did not complete its deletion within the timeout. The last errors seen per topic are: %s", Long.valueOf(this.retryTimeoutMs), map);
        this.log.error(format);
        throw new TimeoutException(format);
    }

    private void maybeSleep(List<Set<String>> list, long j, String str) {
        if (list.stream().anyMatch(set -> {
            return !set.isEmpty();
        })) {
            this.log.info("Internal topics {} could not be {}. Will retry in {} milliseconds. Remaining time in milliseconds: {}", new Object[]{list.stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toSet()), str, Long.valueOf(this.retryBackOffMs), Long.valueOf(j - this.time.milliseconds())});
            Utils.sleep(this.retryBackOffMs);
        }
    }
}
