package io.confluent.auditlogapi.kafka;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import io.confluent.auditlogapi.entities.AuditLogConfigSpec;
import io.confluent.rbacapi.utils.ConfluentAdminClientFactory;
import io.confluent.security.audit.telemetry.exporter.TopicSpec;
import io.confluent.security.authorizer.utils.ThreadUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/auditlogapi/kafka/DestinationTopicManager.class */
public class DestinationTopicManager implements AutoCloseable, Reconfigurable {
    private static final int MAX_TOPIC_RETENTION_CACHE_SIZE = 1000000;
    private static final int MIN_AGE_BEFORE_TOPIC_METADATA_REFRESH_MS = 1000;
    private static final int MAX_AGE_FOR_CACHED_TOPIC_METADATA_MS = 10000;
    private static final int MAX_CLIENT_IDLE_MS = 60000;
    private static final Logger log = LoggerFactory.getLogger(DestinationTopicManager.class);
    private final Admin internalAdminClient;
    private final LoadingCache<String, OptionalLong> topicRetentionMillisCache;
    private Admin adminClient;
    private boolean shutDown = false;
    private boolean enabled = false;
    private ImmutableMap<String, Long> fileConfiguredTopicRetentionMillis = null;
    private ImmutableMap<String, Long> apiConfiguredTopicRetentionMillis = null;
    private DestinationTopicManagerConfig destinationConfig = null;
    private AdminClientConfig adminClientConfig = null;
    private short replicationFactor = Short.parseShort("3");
    private int partitionCount = 12;
    private final ThreadPoolExecutor readExecutor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue(MAX_TOPIC_RETENTION_CACHE_SIZE), ThreadUtils.createThreadFactory("audit-log-destination-topic-config-cache-%d", true), new ThreadPoolExecutor.AbortPolicy());
    private final ThreadPoolExecutor writeExecutor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new ArrayBlockingQueue(1), ThreadUtils.createThreadFactory("audit-log-destination-topic-config-manager-%d", true), new ThreadPoolExecutor.DiscardPolicy());

    public DestinationTopicManager(ConfluentAdmin confluentAdmin) {
        this.adminClient = null;
        this.internalAdminClient = confluentAdmin;
        this.adminClient = confluentAdmin;
        this.writeExecutor.allowCoreThreadTimeOut(true);
        this.topicRetentionMillisCache = CacheBuilder.newBuilder().maximumSize(1000000L).refreshAfterWrite(1000L, TimeUnit.MILLISECONDS).expireAfterWrite(ConfluentAdminClientFactory.ADMIN_CLIENT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS).build(new CacheLoader<String, OptionalLong>() { // from class: io.confluent.auditlogapi.kafka.DestinationTopicManager.1
            public OptionalLong load(String str) {
                return loadAll(ImmutableList.of(str)).get(str);
            }

            public ListenableFuture<OptionalLong> reload(String str, OptionalLong optionalLong) {
                Set singleton = Collections.singleton(str);
                Runnable create = ListenableFutureTask.create(() -> {
                    return (OptionalLong) DestinationTopicManager.this.readTopicRetentionMillis(singleton).get(str);
                });
                DestinationTopicManager.this.readExecutor.submit(create);
                return create;
            }

            public Map<String, OptionalLong> loadAll(Iterable<? extends String> iterable) {
                return DestinationTopicManager.this.readTopicRetentionMillis(ImmutableList.copyOf(iterable));
            }
        });
    }

    public Map<String, Long> getTopicRetentionMillis(Collection<String> collection) {
        try {
            HashMap hashMap = new HashMap();
            this.topicRetentionMillisCache.getAll(collection).forEach((str, optionalLong) -> {
                if (optionalLong.isPresent()) {
                    hashMap.put(str, Long.valueOf(optionalLong.getAsLong()));
                } else {
                    hashMap.put(str, null);
                }
            });
            return hashMap;
        } catch (ExecutionException e) {
            log.warn("Failure while getting topic retention times.");
            return ImmutableMap.of();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, OptionalLong> readTopicRetentionMillis(Collection<String> collection) {
        synchronized (this) {
            if (this.shutDown || !this.enabled) {
                return Collections.emptyMap();
            }
            Admin admin = this.adminClient;
            int requestTimeoutMillis = DestinationTopicManagerConfig.requestTimeoutMillis(this.destinationConfig);
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            collection.forEach(str -> {
            });
            DescribeConfigsResult describeConfigs = admin.describeConfigs((List) collection.stream().map(str2 -> {
                return new ConfigResource(ConfigResource.Type.TOPIC, str2);
            }).collect(Collectors.toList()));
            long currentTimeMillis = System.currentTimeMillis() + requestTimeoutMillis;
            Iterator it = describeConfigs.values().entrySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Map.Entry entry = (Map.Entry) it.next();
                ConfigResource configResource = (ConfigResource) entry.getKey();
                KafkaFuture kafkaFuture = (KafkaFuture) entry.getValue();
                long currentTimeMillis2 = System.currentTimeMillis();
                if (currentTimeMillis2 > currentTimeMillis) {
                    log.warn("Timed out looking up topic configurations");
                    break;
                }
                try {
                    concurrentHashMap.put(configResource.name(), retentionMillisOf((Config) kafkaFuture.get(currentTimeMillis - currentTimeMillis2, TimeUnit.MILLISECONDS)));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.warn("Interrupted while looking up topic retention times.");
                } catch (ExecutionException e2) {
                    if (!(e2.getCause() instanceof UnknownTopicOrPartitionException)) {
                        log.warn("Failure while looking up topic retention times.", e2);
                        break;
                    }
                    concurrentHashMap.put(configResource.name(), OptionalLong.empty());
                } catch (TimeoutException e3) {
                    log.warn("Timed out while looking up topic retention times.", e3);
                }
            }
            return concurrentHashMap;
        }
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        this.shutDown = true;
        setAdminClient(this.internalAdminClient);
        this.readExecutor.shutdown();
        this.writeExecutor.shutdown();
    }

    public CompletionStage<AuditLogConfigSpec> update(AuditLogConfigSpec auditLogConfigSpec) {
        CompletableFuture completableFuture = new CompletableFuture();
        configure(auditLogConfigSpec);
        CompletableFuture.runAsync(() -> {
            updateDestinationTopics(completableFuture, auditLogConfigSpec);
        }, this.writeExecutor);
        return completableFuture;
    }

    private synchronized void configure(AuditLogConfigSpec auditLogConfigSpec) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        auditLogConfigSpec.getDestinations().getTopics().forEach((str, auditLogConfigDestinationConfig) -> {
            builder.put(str, auditLogConfigDestinationConfig.getRetentionMs());
        });
        this.apiConfiguredTopicRetentionMillis = builder.build();
    }

    public synchronized void configure(Map<String, ?> map) {
        CompletableFuture completableFuture = new CompletableFuture();
        if (!isAuditLogEnabledConfig(map)) {
            this.enabled = false;
            this.adminClient = this.internalAdminClient;
            this.fileConfiguredTopicRetentionMillis = null;
            return;
        }
        if (map.containsKey("confluent.security.event.logger.exporter.kafka.topic.replicas")) {
            this.replicationFactor = Short.parseShort((String) map.get("confluent.security.event.logger.exporter.kafka.topic.replicas"));
        }
        if (map.containsKey("confluent.security.event.logger.exporter.kafka.topic.partitions")) {
            this.partitionCount = Integer.parseInt((String) map.get("confluent.security.event.logger.exporter.kafka.topic.partitions"));
        }
        this.destinationConfig = DestinationTopicManagerConfig.build(map);
        AdminClientConfig adminClientConfig = this.destinationConfig.adminClientConfig();
        if (adminClientConfig == null) {
            setAdminClient(this.internalAdminClient);
        } else if (isAdminClientConfigChanged(adminClientConfig)) {
            setAdminClient(ConfluentAdminClientFactory.createAdmin(adminClientConfig));
            this.adminClientConfig = adminClientConfig;
        }
        this.fileConfiguredTopicRetentionMillis = this.destinationConfig.topicRetentionMillis();
        this.enabled = true;
        CompletableFuture.runAsync(() -> {
            updateDestinationTopics(completableFuture, null);
        }, this.writeExecutor);
    }

    public Set<String> reconfigurableConfigs() {
        return DestinationTopicManagerConfig.acceptedConfigNames();
    }

    public void validateReconfiguration(Map<String, ?> map) throws ConfigException {
        if (isAuditLogEnabledConfig(map)) {
            DestinationTopicManagerConfig build = DestinationTopicManagerConfig.build(map);
            build.adminClientConfig();
            build.buildTopicSpecFor("validation-test", 999999999L);
        }
    }

    public synchronized void reconfigure(Map<String, ?> map) {
        configure(map);
    }

    private synchronized void setAdminClient(Admin admin) {
        if (this.adminClient != admin) {
            if (this.adminClient != null && this.adminClient != this.internalAdminClient) {
                Utils.closeQuietly(this.adminClient, "DestinationTopicManager.adminClient");
            }
            this.adminClient = admin;
        }
    }

    private static boolean isAuditLogEnabledConfig(Map<String, ?> map) {
        Object obj = map.get("confluent.security.event.logger.enable");
        return obj == null ? "true".equals("true") : "true".equals(obj.toString());
    }

    private void updateDestinationTopics(CompletableFuture<AuditLogConfigSpec> completableFuture, AuditLogConfigSpec auditLogConfigSpec) {
        synchronized (this) {
            if (this.shutDown || !this.enabled || this.destinationConfig == null) {
                completableFuture.cancel(true);
                return;
            }
            DestinationTopicManagerConfig destinationTopicManagerConfig = this.destinationConfig;
            Admin admin = this.adminClient;
            ImmutableMap<String, Long> immutableMap = (this.apiConfiguredTopicRetentionMillis == null || this.apiConfiguredTopicRetentionMillis.isEmpty()) ? this.fileConfiguredTopicRetentionMillis : this.apiConfiguredTopicRetentionMillis;
            try {
                HashMap hashMap = new HashMap();
                if (immutableMap != null) {
                    immutableMap.forEach((str, l) -> {
                        hashMap.put(str, destinationTopicManagerConfig.buildTopicSpecFor(str, l.longValue()));
                    });
                }
                if (hashMap.isEmpty()) {
                    log.warn("No destination topics configured!");
                }
                int requestTimeoutMillis = DestinationTopicManagerConfig.requestTimeoutMillis(destinationTopicManagerConfig);
                DescribeConfigsResult describeConfigs = admin.describeConfigs((List) hashMap.keySet().stream().map(str2 -> {
                    return new ConfigResource(ConfigResource.Type.TOPIC, str2);
                }).collect(Collectors.toList()));
                HashMap hashMap2 = new HashMap();
                HashMap hashMap3 = new HashMap();
                long currentTimeMillis = System.currentTimeMillis() + requestTimeoutMillis;
                for (Map.Entry entry : describeConfigs.values().entrySet()) {
                    String name = ((ConfigResource) entry.getKey()).name();
                    KafkaFuture kafkaFuture = (KafkaFuture) entry.getValue();
                    long currentTimeMillis2 = System.currentTimeMillis();
                    if (currentTimeMillis2 > currentTimeMillis) {
                        throw new TimeoutException("Timed out while looking up topic configurations.");
                    }
                    try {
                        Config config = (Config) kafkaFuture.get(currentTimeMillis - currentTimeMillis2, TimeUnit.MILLISECONDS);
                        HashMap hashMap4 = new HashMap(((TopicSpec) hashMap.get(name)).config());
                        for (ConfigEntry configEntry : config.entries()) {
                            if (configEntry.isReadOnly()) {
                                hashMap4.remove(configEntry.name());
                            } else if (Objects.equals(configEntry.value(), hashMap4.get(configEntry.name()))) {
                                hashMap4.remove(configEntry.name());
                            }
                        }
                        if (!hashMap4.isEmpty()) {
                            hashMap3.put(name, hashMap4);
                        }
                        this.topicRetentionMillisCache.put(name, retentionMillisOf(config));
                    } catch (ExecutionException e) {
                        if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                            log.error("Failure while checking on status of destination topics.", e);
                            completableFuture.completeExceptionally(e);
                            return;
                        } else {
                            hashMap2.put(name, hashMap.get(name));
                            this.topicRetentionMillisCache.put(name, OptionalLong.empty());
                        }
                    }
                }
                if (!hashMap2.isEmpty()) {
                    log.debug("{} event log topics need to be created: {}", Integer.valueOf(hashMap2.size()), hashMap2.keySet());
                    for (Map.Entry entry2 : admin.createTopics((Collection) hashMap2.entrySet().stream().map(entry3 -> {
                        return new NewTopic((String) entry3.getKey(), this.partitionCount, this.replicationFactor).configs(((TopicSpec) entry3.getValue()).config());
                    }).collect(Collectors.toList()), new CreateTopicsOptions().timeoutMs(Integer.valueOf(requestTimeoutMillis))).values().entrySet()) {
                        try {
                            ((KafkaFuture) entry2.getValue()).get();
                            String str3 = (String) entry2.getKey();
                            String str4 = (String) ((TopicSpec) hashMap2.get(str3)).config().get("retention.ms");
                            if (str4 == null) {
                                this.topicRetentionMillisCache.put(str3, OptionalLong.empty());
                            } else {
                                this.topicRetentionMillisCache.put(str3, OptionalLong.of(Long.parseLong(str4)));
                            }
                        } catch (ExecutionException e2) {
                            if (!(e2.getCause() instanceof TopicExistsException)) {
                                log.error("Unexpected exception creating destination topic {}", entry2.getKey(), e2);
                                completableFuture.completeExceptionally(e2);
                                return;
                            }
                            CompletableFuture.runAsync(() -> {
                                updateDestinationTopics(completableFuture, auditLogConfigSpec);
                            }, this.writeExecutor);
                        }
                    }
                }
                if (!hashMap3.isEmpty()) {
                    for (Map.Entry entry4 : admin.incrementalAlterConfigs((Map) hashMap3.entrySet().stream().collect(Collectors.toMap(entry5 -> {
                        return new ConfigResource(ConfigResource.Type.TOPIC, (String) entry5.getKey());
                    }, entry6 -> {
                        return (List) ((Map) entry6.getValue()).entrySet().stream().map(entry6 -> {
                            return new AlterConfigOp(new ConfigEntry((String) entry6.getKey(), (String) entry6.getValue()), AlterConfigOp.OpType.SET);
                        }).collect(Collectors.toList());
                    })), new AlterConfigsOptions().timeoutMs(Integer.valueOf(requestTimeoutMillis))).values().entrySet()) {
                        String name2 = ((ConfigResource) entry4.getKey()).name();
                        try {
                            ((KafkaFuture) entry4.getValue()).get();
                            String str5 = (String) ((Map) hashMap3.get(name2)).get("retention.ms");
                            if (immutableMap != null) {
                                this.topicRetentionMillisCache.put(name2, OptionalLong.of(Long.parseLong(str5)));
                            } else {
                                this.topicRetentionMillisCache.put(name2, OptionalLong.empty());
                            }
                        } catch (ExecutionException e3) {
                            log.error("Unexpected exception updating destination topic {}", name2, e3);
                            completableFuture.completeExceptionally(e3);
                            return;
                        }
                    }
                }
            } catch (InterruptedException e4) {
                Thread.currentThread().interrupt();
                log.warn("Interrupted while updating destination topics.");
                completableFuture.completeExceptionally(e4);
                return;
            } catch (RuntimeException e5) {
                log.error("Unexpected error while updating destination topics.", e5.getMessage());
                completableFuture.completeExceptionally(e5);
                return;
            } catch (TimeoutException e6) {
                log.warn("Timed out while updating destination topics.", e6);
                CompletableFuture.runAsync(() -> {
                    updateDestinationTopics(completableFuture, auditLogConfigSpec);
                }, this.writeExecutor);
            }
            completableFuture.complete(auditLogConfigSpec);
        }
    }

    private boolean isAdminClientConfigChanged(AdminClientConfig adminClientConfig) {
        return this.adminClientConfig == null || !this.adminClientConfig.originals().equals(adminClientConfig.originals());
    }

    @Nonnull
    private static OptionalLong retentionMillisOf(Config config) {
        ConfigEntry configEntry = config.get("retention.ms");
        if (configEntry == null || configEntry.value() == null || configEntry.value().isEmpty()) {
            return OptionalLong.empty();
        }
        try {
            return OptionalLong.of(Math.max(-1L, Long.parseLong(configEntry.value())));
        } catch (NumberFormatException e) {
            return OptionalLong.empty();
        }
    }
}
