package io.confluent.auditlogapi.store;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.auditlogapi.credentials.ExtractedCredentials;
import io.confluent.auditlogapi.entities.AuditLogConfigSpec;
import io.confluent.auditlogapi.entities.Conversions;
import io.confluent.kafka.clients.plugins.auth.token.TokenBearerLoginCallbackHandler;
import io.confluent.mds.DynamicConfigurator;
import io.confluent.rbacapi.entities.ClusterInfo;
import io.confluent.rbacapi.services.ClusterRegistryService;
import io.confluent.rbacapi.utils.ClusterType;
import io.confluent.rbacapi.utils.ConfluentAdminClientFactory;
import io.confluent.security.audit.router.AuditLogRouterJsonConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/* loaded from: input_file:io/confluent/auditlogapi/store/DynamicConfigAuditLogConfigStore.class */
public class DynamicConfigAuditLogConfigStore extends BaseAuditLogConfigStore implements Reconfigurable {
    private static final String SSL_PREFIX = "ssl.";
    private static final int CLIENT_TIMEOUT_MS = 60000;
    private static final ConfigResource CLUSTER_CONFIG_RESOURCE = new ConfigResource(ConfigResource.Type.BROKER, "");
    private static final AlterConfigsOptions MANAGED_CLUSTERS_OPS = new AlterConfigsOptions().timeoutMs(5000);
    private final DynamicConfigurator dynamicConfigurator;
    private final ClusterRegistryService clusterRegistryService;
    private AuditLogConfigSpec spec = null;
    private Map<String, Object> sslConfigs = new HashMap();

    public DynamicConfigAuditLogConfigStore(ClusterRegistryService clusterRegistryService, DynamicConfigurator dynamicConfigurator) {
        this.clusterRegistryService = (ClusterRegistryService) Objects.requireNonNull(clusterRegistryService);
        this.dynamicConfigurator = (DynamicConfigurator) Objects.requireNonNull(dynamicConfigurator);
    }

    @Override // io.confluent.auditlogapi.store.BaseAuditLogConfigStore
    protected synchronized AuditLogConfigSpec get() {
        return this.spec;
    }

    @Override // io.confluent.auditlogapi.store.BaseAuditLogConfigStore
    protected synchronized CompletionStage<AuditLogConfigSpec> asyncCompareAndSet(AuditLogConfigSpec auditLogConfigSpec, AuditLogConfigSpec auditLogConfigSpec2) {
        try {
            AlterConfigsResult clusterConfig = this.dynamicConfigurator.setClusterConfig(Collections.singleton(new ConfigEntry("confluent.security.event.router.config", Conversions.toJson(Conversions.convert(auditLogConfigSpec2)))));
            CompletableFuture completableFuture = new CompletableFuture();
            clusterConfig.all().whenComplete((r5, th) -> {
                if (th == null) {
                    completableFuture.complete(auditLogConfigSpec2);
                } else {
                    completableFuture.completeExceptionally(th);
                }
            });
            return completableFuture;
        } catch (RuntimeException | JsonProcessingException e) {
            return FutureUtil.exceptionalFuture(e);
        }
    }

    @Override // io.confluent.auditlogapi.store.BaseAuditLogConfigStore
    protected CompletionStage<AuditLogConfigSpec> asyncBroadcast(ExtractedCredentials extractedCredentials, AuditLogConfigSpec auditLogConfigSpec) {
        Map<String, Map<String, String>> join = getManagedClustersInfo(extractedCredentials).join();
        HashMap hashMap = new HashMap();
        try {
            String json = Conversions.toJson(Conversions.convert(auditLogConfigSpec));
            ArrayList arrayList = new ArrayList(join.size());
            ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            join.forEach((str, map) -> {
                HashMap hashMap2 = new HashMap(map);
                hashMap2.putAll(extractedCredentials.toClientCredentialProperties());
                if (map.containsKey("security.protocol") && (((String) map.get("security.protocol")).equalsIgnoreCase(SecurityProtocol.SSL.name) || ((String) map.get("security.protocol")).equalsIgnoreCase(SecurityProtocol.SASL_SSL.name))) {
                    hashMap2.putAll(this.sslConfigs);
                }
                ConfluentAdmin createAdmin = ConfluentAdminClientFactory.createAdmin(hashMap2);
                hashMap.put(str, createAdmin);
                arrayList.add(((KafkaFuture) createAdmin.incrementalAlterConfigs(ImmutableMap.of(CLUSTER_CONFIG_RESOURCE, ImmutableList.of(new AlterConfigOp(new ConfigEntry("confluent.security.event.router.config", json), AlterConfigOp.OpType.SET))), MANAGED_CLUSTERS_OPS).values().get(CLUSTER_CONFIG_RESOURCE)).whenComplete((r7, th) -> {
                    if (th != null) {
                        concurrentHashMap.put(str, th);
                    } else {
                        newKeySet.add(str);
                    }
                }));
            });
            CompletableFuture completableFuture = new CompletableFuture();
            KafkaFuture.allOf((KafkaFuture[]) arrayList.toArray(new KafkaFuture[0])).whenComplete((r12, th) -> {
                if (concurrentHashMap.isEmpty()) {
                    completableFuture.complete(auditLogConfigSpec);
                } else {
                    completableFuture.completeExceptionally(new AuditLogConfigStorePartialSuccessException(auditLogConfigSpec, newKeySet, concurrentHashMap));
                }
                hashMap.forEach((str2, confluentAdmin) -> {
                    ConfluentAdminClientFactory.closeAdminClientWithTimeout(confluentAdmin);
                });
            });
            return completableFuture;
        } catch (JsonProcessingException e) {
            return FutureUtil.exceptionalFuture(e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    public Set<String> reconfigurableConfigs() {
        return Collections.singleton("confluent.security.event.router.config");
    }

    public void validateReconfiguration(Map<String, ?> map) throws ConfigException {
        String str = (String) map.get("confluent.security.event.router.config");
        if (str == null || str.trim().isEmpty()) {
            return;
        }
        try {
            Conversions.convert(AuditLogRouterJsonConfig.load(str));
        } catch (IOException e) {
            throw new ConfigException("confluent.security.event.router.config", str);
        }
    }

    public void reconfigure(Map<String, ?> map) {
        String str = (String) map.get("confluent.security.event.router.config");
        if (str == null || str.isEmpty()) {
            this.spec = DEFAULT_SPEC;
        } else {
            try {
                this.spec = Conversions.convert(AuditLogRouterJsonConfig.load(str));
            } catch (IOException e) {
                throw new ConfigException("confluent.security.event.router.config", str);
            }
        }
    }

    public void configure(Map<String, ?> map) {
        this.sslConfigs.putAll((Map) map.entrySet().stream().filter(entry -> {
            return ((String) entry.getKey()).startsWith(SSL_PREFIX);
        }).collect(Collectors.toMap(entry2 -> {
            return (String) entry2.getKey();
        }, entry3 -> {
            return entry3.getValue();
        })));
        reconfigure(map);
    }

    private CompletableFuture<Map<String, Map<String, String>>> getManagedClustersInfo(ExtractedCredentials extractedCredentials) {
        return CompletableFuture.supplyAsync(() -> {
            List<ClusterInfo> clusters = this.clusterRegistryService.getClusters(extractedCredentials.kafkaPrincipal(), ClusterType.KAFKA_CLUSTER);
            HashMap hashMap = new HashMap();
            clusters.forEach(clusterInfo -> {
                clusterInfo.getHosts().forEach(hostInfo -> {
                    Map map = (Map) hashMap.computeIfAbsent(clusterInfo.getClusterName(), str -> {
                        return new HashMap();
                    });
                    map.put("bootstrap.servers", hostInfo.getHost() + ":" + hostInfo.getPort());
                    map.put("security.protocol", clusterInfo.getProtocol().value());
                    map.put("sasl.mechanism", "OAUTHBEARER");
                    map.put("sasl.login.callback.handler.class", TokenBearerLoginCallbackHandler.class.getSimpleName());
                });
            });
            return hashMap;
        });
    }
}
