package io.confluent.connect.replicator.util;

import io.confluent.connect.replicator.util.ReplicatorAdminClient;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/replicator/util/NewReplicatorAdminClient.class */
public class NewReplicatorAdminClient implements ReplicatorAdminClient {
    private static final Logger log = LoggerFactory.getLogger(ReplicatorAdminClient.class);
    private static final long RETRIABLE_FUTURE_TIMEOUT = 30000;
    private final Time time;
    private final long maxAgeMs;
    private final AdminClient admin;
    private final ScheduledExecutorService metadataFetcher;
    private final AtomicReference<Map<String, TopicMetadata>> topicMetadata;
    private Collection<Node> brokers;
    private long nextBrokerMetadataRefresh;
    private volatile boolean forceTopicMetadataRefresh;
    private Set<String> interestedTopics;
    private ReplicatorAdminClient.TopicMetadataListener metadataListener;
    private ScheduledFuture<?> metadataFuture;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/confluent/connect/replicator/util/NewReplicatorAdminClient$TopicMetadataTask.class */
    public class TopicMetadataTask implements Runnable {
        private final Set<String> taskInterestedTopics;
        private final ReplicatorAdminClient.TopicMetadataListener taskMetadataListener;

        public TopicMetadataTask(Set<String> set, ReplicatorAdminClient.TopicMetadataListener topicMetadataListener) {
            this.taskInterestedTopics = set;
            this.taskMetadataListener = topicMetadataListener;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                refreshTopicMetadata();
                NewReplicatorAdminClient.log.debug("Refreshed topic metadata for interested topics: {}", NewReplicatorAdminClient.this.interestedTopics);
            } catch (InterruptedException | RuntimeException | ExecutionException e) {
                NewReplicatorAdminClient.log.warn(String.format("Failed to refresh topic metadata. Will try again in %dms.", Long.valueOf(NewReplicatorAdminClient.this.maxAgeMs)), e);
            }
        }

        private void refreshTopicMetadata() throws InterruptedException, ExecutionException {
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : NewReplicatorAdminClient.this.admin.describeTopics(this.taskInterestedTopics).values().entrySet()) {
                try {
                    TopicDescription topicDescription = (TopicDescription) ((KafkaFuture) entry.getValue()).get();
                    int size = topicDescription.partitions().size();
                    if (size > 0) {
                        hashMap.put(topicDescription.name(), new TopicMetadata(topicDescription.name(), size));
                        NewReplicatorAdminClient.log.trace("Updating metadata for topic '{}' with {} partitions", topicDescription.name(), Integer.valueOf(size));
                    }
                } catch (ExecutionException e) {
                    if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                        NewReplicatorAdminClient.log.warn("Unable to describe topic {}, this topic will be ignored until the next refresh. This is regular behavior upon start, but could signify an unreachable topic otherwise", entry.getKey());
                        throw e;
                    }
                    NewReplicatorAdminClient.log.trace("Received UnknownTopicOrPartitionException when attempting to describe topic {}. This is expected to be transient and will be resolved in subsequent metadata updates.", entry.getKey());
                }
            }
            NewReplicatorAdminClient.this.topicMetadata.set(hashMap);
            if (this.taskMetadataListener != null) {
                this.taskMetadataListener.onTopicMetadataRefresh();
            }
        }
    }

    public NewReplicatorAdminClient(Map<String, Object> map, Time time, long j, String str) {
        this(AdminClient.create(map), time, j, str);
    }

    public NewReplicatorAdminClient(AdminClient adminClient, Time time, long j, final String str) {
        this.time = time;
        this.maxAgeMs = j > 0 ? j : Long.MAX_VALUE;
        this.admin = adminClient;
        ThreadFactory threadFactory = new ThreadFactory() { // from class: io.confluent.connect.replicator.util.NewReplicatorAdminClient.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "MetadataFetcher-" + str);
            }
        };
        if (j > 0) {
            this.metadataFetcher = Executors.newSingleThreadScheduledExecutor(threadFactory);
        } else {
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(0, threadFactory);
            scheduledThreadPoolExecutor.setKeepAliveTime(1L, TimeUnit.SECONDS);
            scheduledThreadPoolExecutor.allowCoreThreadTimeOut(true);
            this.metadataFetcher = Executors.unconfigurableScheduledExecutorService(scheduledThreadPoolExecutor);
        }
        this.nextBrokerMetadataRefresh = time.milliseconds();
        this.forceTopicMetadataRefresh = true;
        this.topicMetadata = new AtomicReference<>();
        this.topicMetadata.set(new HashMap());
        this.interestedTopics = Collections.emptySet();
    }

    protected ScheduledExecutorService metadataFetcher() {
        return this.metadataFetcher;
    }

    @Deprecated
    protected NewReplicatorAdminClient() {
        this.time = null;
        this.maxAgeMs = 0L;
        this.admin = null;
        this.metadataFetcher = null;
        this.topicMetadata = null;
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public String clusterId() throws InterruptedException, ExecutionException {
        long milliseconds = this.time.milliseconds() + RETRIABLE_FUTURE_TIMEOUT;
        Callable callable = () -> {
            return (String) this.admin.describeCluster().clusterId().get();
        };
        Future submit = this.metadataFetcher.submit(callable);
        while (true) {
            Future future = submit;
            if (this.time.milliseconds() >= milliseconds) {
                throw new TimeoutException("Failed to retrieve cluster id within timeout of 30000ms.");
            }
            try {
                return (String) future.get();
            } catch (Exception e) {
                if (!(e.getCause() instanceof ExecutionException) || !(e.getCause().getCause() instanceof RetriableException)) {
                    throw e;
                }
                log.warn("Could not fetch Cluster Id, will retry", e);
                submit = this.metadataFetcher.schedule(callable, 2L, TimeUnit.SECONDS);
            }
        }
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public Properties topicConfig(String str) throws InterruptedException, ExecutionException {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
        Config config = (Config) ((Map) this.admin.describeConfigs(Collections.singleton(configResource)).all().get()).get(configResource);
        Properties properties = new Properties();
        for (ConfigEntry configEntry : config.entries()) {
            if (!configEntry.isDefault()) {
                properties.put(configEntry.name(), configEntry.value());
            }
        }
        return properties;
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public void changeTopicConfig(String str, Properties properties) throws InterruptedException, ExecutionException {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
        Config config = (Config) ((Map) this.admin.describeConfigs(Collections.singleton(configResource)).all().get()).get(configResource);
        HashMap hashMap = new HashMap();
        for (String str2 : properties.stringPropertyNames()) {
            hashMap.put(str2, properties.getProperty(str2));
        }
        ArrayList arrayList = new ArrayList(properties.size());
        for (ConfigEntry configEntry : config.entries()) {
            String str3 = (String) hashMap.get(configEntry.name());
            if (str3 != null) {
                arrayList.add(new ConfigEntry(configEntry.name(), str3, configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
                hashMap.remove(configEntry.name());
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            arrayList.add(new ConfigEntry((String) entry.getKey(), (String) entry.getValue()));
        }
        this.admin.alterConfigs(Collections.singletonMap(configResource, new Config(arrayList))).all().get();
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public synchronized int aliveBrokers() throws InterruptedException, ExecutionException {
        long milliseconds = this.time.milliseconds();
        if (this.brokers == null || milliseconds >= this.nextBrokerMetadataRefresh) {
            this.brokers = (Collection) this.admin.describeCluster().nodes().get();
            this.nextBrokerMetadataRefresh = this.time.milliseconds() + this.maxAgeMs;
        }
        return this.brokers.size();
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public synchronized void setInterestedTopics(Set<String> set, ReplicatorAdminClient.TopicMetadataListener topicMetadataListener) {
        this.interestedTopics = set;
        this.metadataListener = topicMetadataListener;
        Map<String, TopicMetadata> map = this.topicMetadata.get();
        map.keySet().retainAll(set);
        int size = set.size() - map.size();
        if (size > 0) {
            log.info("Requesting metadata refresh after {} new topics were added", Integer.valueOf(size));
            this.forceTopicMetadataRefresh = true;
        }
    }

    private Future<?> scheduleSingleRefreshMetadata() {
        return this.metadataFetcher.submit(new TopicMetadataTask(this.interestedTopics, this.metadataListener));
    }

    private ScheduledFuture<?> schedulePeriodicRefreshMetadata() {
        this.metadataFuture = this.metadataFetcher.scheduleWithFixedDelay(new TopicMetadataTask(this.interestedTopics, this.metadataListener), 0L, this.maxAgeMs, TimeUnit.MILLISECONDS);
        return this.metadataFuture;
    }

    private synchronized Map<String, TopicMetadata> topicMetadata() {
        while (this.forceTopicMetadataRefresh) {
            if (this.metadataFuture != null) {
                this.metadataFuture.cancel(true);
            }
            try {
                this.forceTopicMetadataRefresh = false;
                scheduleSingleRefreshMetadata().get();
                schedulePeriodicRefreshMetadata();
            } catch (InterruptedException | RuntimeException | ExecutionException e) {
                log.warn("Topic metadata update did not complete.", e);
            }
        }
        return this.topicMetadata.get();
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public synchronized TopicMetadata topicMetadata(String str) {
        return topicMetadata().get(str);
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public synchronized boolean partitionExists(TopicPartition topicPartition) {
        TopicMetadata topicMetadata = topicMetadata().get(topicPartition.topic());
        return topicMetadata != null && topicPartition.partition() < topicMetadata.numPartitions();
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public synchronized boolean topicExists(String str) {
        return topicMetadata().get(str) != null;
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public boolean createTopic(String str, int i, short s, Properties properties) throws InterruptedException, ExecutionException {
        return createTopic(str, Optional.of(Integer.valueOf(i)), Optional.of(Short.valueOf(s)), properties);
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public boolean createTopic(String str, Optional<Integer> optional, Optional<Short> optional2, Properties properties) throws InterruptedException, ExecutionException {
        return createTopic(str, optional, optional2, properties, true);
    }

    private boolean createTopic(String str, Optional<Integer> optional, Optional<Short> optional2, Properties properties, boolean z) throws InterruptedException, ExecutionException {
        log.info("Creating topic {} with {} partitions, replication factor {}, and config {}", new Object[]{str, optional, optional2, properties});
        HashMap hashMap = new HashMap();
        for (String str2 : properties.stringPropertyNames()) {
            hashMap.put(str2, properties.getProperty(str2));
        }
        NewTopic newTopic = new NewTopic(str, optional, optional2);
        newTopic.configs(Collections.unmodifiableMap(hashMap));
        try {
            this.admin.createTopics(Collections.singleton(newTopic)).all().get();
            this.forceTopicMetadataRefresh = true;
            return true;
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof TopicExistsException) {
                log.debug("Failed to create topic {} because it already exists. Ignoring exception", str, e);
                return false;
            }
            if (!(cause instanceof InvalidConfigurationException) || !cause.getMessage().contains("message.downconversion.enable") || !z) {
                throw e;
            }
            log.debug("Target Kafka cluster doesn't recognize topic config '{}'. Removing this config and retrying to create the topic one more time", "message.downconversion.enable", e);
            properties.remove("message.downconversion.enable");
            return createTopic(str, optional, optional2, properties, false);
        }
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient
    public void addPartitions(String str, int i) throws InterruptedException, ExecutionException {
        log.info("Increasing number of partitions to {} for topic {} and requesting metadata refresh", Integer.valueOf(i), str);
        this.admin.createPartitions(Collections.singletonMap(str, NewPartitions.increaseTo(i))).all().get();
        this.forceTopicMetadataRefresh = true;
    }

    @Override // io.confluent.connect.replicator.util.ReplicatorAdminClient, java.lang.AutoCloseable
    public void close() {
        try {
            this.admin.close();
        } finally {
            this.metadataFetcher.shutdownNow();
        }
    }
}
