package org.apache.kafka.streams.processor.internals;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.slf4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.3.0.jar:org/apache/kafka/streams/processor/internals/InternalTopicManager.class */
public class InternalTopicManager {
    private static final String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list (https://kafka.apache.org/contact).";
    private final long windowChangeLogAdditionalRetention;
    private final short replicationFactor;
    private final AdminClient adminClient;
    private final int retries;
    private final long retryBackOffMs;
    private final Map<String, String> defaultTopicConfigs = new HashMap();
    private final Logger log = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())).logger(getClass());

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.3.0.jar:org/apache/kafka/streams/processor/internals/InternalTopicManager$InternalAdminClientConfig.class */
    private static final class InternalAdminClientConfig extends AdminClientConfig {
        private InternalAdminClientConfig(Map<?, ?> map) {
            super(map, false);
        }
    }

    public InternalTopicManager(AdminClient adminClient, StreamsConfig streamsConfig) {
        this.adminClient = adminClient;
        this.replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
        this.windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG).longValue();
        InternalAdminClientConfig internalAdminClientConfig = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy"));
        this.retries = internalAdminClientConfig.getInt("retries").intValue();
        this.retryBackOffMs = internalAdminClientConfig.getLong("retry.backoff.ms").longValue();
        this.log.debug("Configs:" + Utils.NL, "\t{} = {}" + Utils.NL, "\t{} = {}" + Utils.NL, "\t{} = {}", "retries", Integer.valueOf(this.retries), StreamsConfig.REPLICATION_FACTOR_CONFIG, Short.valueOf(this.replicationFactor), StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, Long.valueOf(this.windowChangeLogAdditionalRetention));
        for (Map.Entry<String, Object> entry : streamsConfig.originalsWithPrefix(StreamsConfig.TOPIC_PREFIX).entrySet()) {
            if (entry.getValue() != null) {
                this.defaultTopicConfigs.put(entry.getKey(), entry.getValue().toString());
            }
        }
    }

    public void makeReady(Map<String, InternalTopicConfig> map) {
        int i = this.retries;
        Set<String> hashSet = new HashSet(map.keySet());
        while (!hashSet.isEmpty() && i >= 0) {
            hashSet = validateTopics(hashSet, map);
            if (hashSet.size() > 0) {
                HashSet hashSet2 = new HashSet();
                Iterator<String> it = hashSet.iterator();
                while (it.hasNext()) {
                    InternalTopicConfig internalTopicConfig = (InternalTopicConfig) Utils.notNull(map.get(it.next()));
                    Map<String, String> properties = internalTopicConfig.getProperties(this.defaultTopicConfigs, this.windowChangeLogAdditionalRetention);
                    this.log.debug("Going to create topic {} with {} partitions and config {}.", internalTopicConfig.name(), Integer.valueOf(internalTopicConfig.numberOfPartitions()), properties);
                    hashSet2.add(new NewTopic(internalTopicConfig.name(), internalTopicConfig.numberOfPartitions(), this.replicationFactor).configs(properties));
                }
                for (Map.Entry<String, KafkaFuture<Void>> entry : this.adminClient.createTopics(hashSet2).values().entrySet()) {
                    String key = entry.getKey();
                    try {
                        entry.getValue().get();
                        hashSet.remove(key);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        this.log.error(INTERRUPTED_ERROR_MESSAGE, (Throwable) e);
                        throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, e);
                    } catch (ExecutionException e2) {
                        Throwable cause = e2.getCause();
                        if (!(cause instanceof TopicExistsException)) {
                            this.log.error("Unexpected error during topic creation for {}.\nError message was: {}", key, cause.toString());
                            throw new StreamsException(String.format("Could not create topic %s.", key), cause);
                        }
                        this.log.info("Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\nWill retry to create this topic in {} ms (to let broker finish async delete operation first).\nError message was: {}", key, Long.valueOf(this.retryBackOffMs), cause.toString());
                    }
                }
            }
            if (!hashSet.isEmpty()) {
                this.log.info("Topics {} can not be made ready with {} retries left", hashSet, Integer.valueOf(this.retries));
                try {
                    Thread.sleep(this.retryBackOffMs);
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                }
                i--;
            }
        }
        if (hashSet.isEmpty()) {
            return;
        }
        String format = String.format("Could not create topics after %d retries. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error.", Integer.valueOf(this.retries));
        this.log.error(format);
        throw new StreamsException(format);
    }

    protected Map<String, Integer> getNumPartitions(Set<String> set) {
        this.log.debug("Trying to check if topics {} have been created with expected number of partitions.", set);
        Map<String, KafkaFuture<TopicDescription>> values = this.adminClient.describeTopics(set).values();
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : values.entrySet()) {
            String key = entry.getKey();
            try {
                hashMap.put(entry.getKey(), Integer.valueOf(entry.getValue().get().partitions().size()));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.log.error(INTERRUPTED_ERROR_MESSAGE, (Throwable) e);
                throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, e);
            } catch (ExecutionException e2) {
                Throwable cause = e2.getCause();
                if (!(cause instanceof UnknownTopicOrPartitionException) && !(cause instanceof LeaderNotAvailableException)) {
                    this.log.error("Unexpected error during topic description for {}.\nError message was: {}", key, cause.toString());
                    throw new StreamsException(String.format("Could not create topic %s.", key), cause);
                }
                this.log.debug("Topic {} is unknown or not found, hence not existed yet.", key);
            }
        }
        return hashMap;
    }

    private Set<String> validateTopics(Set<String> set, Map<String, InternalTopicConfig> map) {
        Map<String, Integer> numPartitions = getNumPartitions(set);
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, InternalTopicConfig> entry : map.entrySet()) {
            String key = entry.getKey();
            int numberOfPartitions = entry.getValue().numberOfPartitions();
            if (!numPartitions.containsKey(key)) {
                hashSet.add(key);
            } else if (!numPartitions.get(key).equals(Integer.valueOf(numberOfPartitions))) {
                String format = String.format("Existing internal topic %s has invalid partitions: expected: %d; actual: %d. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.", key, Integer.valueOf(numberOfPartitions), numPartitions.get(key));
                this.log.error(format);
                throw new StreamsException(format);
            }
        }
        return hashSet;
    }
}
