/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManagedLedgerClientFactory
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(ManagedLedgerClientFactory.class);
    private final ManagedLedgerFactory managedLedgerFactory;
    private final BookKeeper defaultBkClient;
    private final Map<EnsemblePlacementPolicyConfig, BookKeeper> bkEnsemblePolicyToBkClientMap = Maps.newConcurrentMap();
    private StatsProvider statsProvider = new NullStatsProvider();

    public ManagedLedgerClientFactory(ServiceConfiguration conf, ZooKeeper zkClient, BookKeeperClientFactory bookkeeperProvider) throws Exception {
        ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
        managedLedgerFactoryConfig.setMaxCacheSize((long)conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
        managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
        managedLedgerFactoryConfig.setNumManagedLedgerWorkerThreads(conf.getManagedLedgerNumWorkerThreads());
        managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(conf.getManagedLedgerNumSchedulerThreads());
        managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency());
        managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(conf.getManagedLedgerCacheEvictionTimeThresholdMillis());
        managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
        managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds(conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
        managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
        managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds());
        ClientConfiguration configuration = new ClientConfiguration();
        if (conf.isBookkeeperClientExposeStatsToPrometheus()) {
            configuration.addProperty("prometheusStatsLatencyRolloverSeconds", conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
            configuration.addProperty("cluster", conf.getClusterName());
            this.statsProvider = new PrometheusMetricsProvider();
        }
        this.statsProvider.start(configuration);
        StatsLogger statsLogger = this.statsProvider.getStatsLogger("pulsar_managedLedger_client");
        this.defaultBkClient = bookkeeperProvider.create(conf, zkClient, Optional.empty(), null);
        ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy bkFactory = ensemblePlacementPolicyConfig -> {
            BookKeeper bkClient = null;
            if (ensemblePlacementPolicyConfig != null && ensemblePlacementPolicyConfig.getPolicyClass() != null) {
                bkClient = this.bkEnsemblePolicyToBkClientMap.computeIfAbsent(ensemblePlacementPolicyConfig, key -> {
                    try {
                        return bookkeeperProvider.create(conf, zkClient, Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()), ensemblePlacementPolicyConfig.getProperties());
                    }
                    catch (Exception e) {
                        log.error("Failed to initialize bk-client for policy {}, properties {}", new Object[]{ensemblePlacementPolicyConfig.getPolicyClass(), ensemblePlacementPolicyConfig.getProperties(), e});
                        return this.defaultBkClient;
                    }
                });
            }
            return bkClient != null ? bkClient : this.defaultBkClient;
        };
        this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkFactory, zkClient, managedLedgerFactoryConfig, statsLogger);
    }

    public ManagedLedgerFactory getManagedLedgerFactory() {
        return this.managedLedgerFactory;
    }

    public BookKeeper getBookKeeperClient() {
        return this.defaultBkClient;
    }

    public StatsProvider getStatsProvider() {
        return this.statsProvider;
    }

    @VisibleForTesting
    public Map<EnsemblePlacementPolicyConfig, BookKeeper> getBkEnsemblePolicyToBookKeeperMap() {
        return this.bkEnsemblePolicyToBkClientMap;
    }

    @Override
    public void close() throws IOException {
        try {
            this.managedLedgerFactory.shutdown();
            log.info("Closed managed ledger factory");
            this.statsProvider.stop();
            try {
                this.defaultBkClient.close();
            }
            catch (RejectedExecutionException ree) {
                log.warn("Encountered exceptions on closing bookkeeper client", (Throwable)ree);
            }
            if (this.bkEnsemblePolicyToBkClientMap != null) {
                this.bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> {
                    try {
                        if (bk != null) {
                            bk.close();
                        }
                    }
                    catch (Exception e) {
                        log.warn("Failed to close bookkeeper-client for policy {}", policy, (Object)e);
                    }
                });
            }
            log.info("Closed BookKeeper client");
        }
        catch (Exception e) {
            log.warn(e.getMessage(), (Throwable)e);
            throw new IOException(e);
        }
    }
}

