/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.sql.presto;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.bookkeeper.mledger.offload.OffloadersCache;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.sql.presto.PulsarConnectorConfig;
import org.apache.pulsar.sql.presto.PulsarConnectorUtils;

public class PulsarConnectorCache {
    private static final Logger log = Logger.get(PulsarConnectorCache.class);
    @VisibleForTesting
    static PulsarConnectorCache instance;
    private final MetadataStore metadataStore;
    private final ManagedLedgerFactory managedLedgerFactory;
    private final StatsProvider statsProvider;
    private OrderedScheduler offloaderScheduler;
    private OffloadersCache offloadersCache = new OffloadersCache();
    private LedgerOffloader defaultOffloader;
    private Map<NamespaceName, LedgerOffloader> offloaderMap = new ConcurrentHashMap<NamespaceName, LedgerOffloader>();
    private static final String OFFLOADERS_DIRECTOR = "offloadersDirectory";
    private static final String MANAGED_LEDGER_OFFLOAD_DRIVER = "managedLedgerOffloadDriver";
    private static final String MANAGED_LEDGER_OFFLOAD_MAX_THREADS = "managedLedgerOffloadMaxThreads";

    private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
        this.metadataStore = MetadataStoreFactory.create(pulsarConnectorConfig.getZookeeperUri(), MetadataStoreConfig.builder().build());
        this.managedLedgerFactory = this.initManagedLedgerFactory(pulsarConnectorConfig);
        this.statsProvider = PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(), StatsProvider.class, this.getClass().getClassLoader());
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        pulsarConnectorConfig.getStatsProviderConfigs().forEach((arg_0, arg_1) -> ((ClientConfiguration)clientConfiguration).setProperty(arg_0, arg_1));
        this.statsProvider.start((Configuration)clientConfiguration);
        this.defaultOffloader = this.initManagedLedgerOffloader(pulsarConnectorConfig.getOffloadPolices(), pulsarConnectorConfig);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
        Class<PulsarConnectorCache> clazz = PulsarConnectorCache.class;
        synchronized (PulsarConnectorCache.class) {
            if (instance == null) {
                instance = new PulsarConnectorCache(pulsarConnectorConfig);
            }
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return instance;
        }
    }

    private ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
        ClientConfiguration bkClientConfiguration = ((ClientConfiguration)((ClientConfiguration)new ClientConfiguration().setZkServers(pulsarConnectorConfig.getZookeeperUri())).setMetadataServiceUri("zk://" + pulsarConnectorConfig.getZookeeperUri().replace(",", ";") + "/ledgers")).setClientTcpNoDelay(false).setUseV2WireProtocol(pulsarConnectorConfig.getBookkeeperUseV2Protocol()).setExplictLacInterval(pulsarConnectorConfig.getBookkeeperExplicitInterval()).setStickyReadsEnabled(false).setReadEntryTimeout(60).setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue()).setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads()).setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads());
        ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
        managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB());
        managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(pulsarConnectorConfig.getManagedLedgerNumSchedulerThreads());
        return new ManagedLedgerFactoryImpl(this.metadataStore, bkClientConfiguration, managedLedgerFactoryConfig);
    }

    public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, OffloadPoliciesImpl offloadPolicies, PulsarConnectorConfig pulsarConnectorConfig) {
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        if (offloadPolicies == null) {
            managedLedgerConfig.setLedgerOffloader(this.defaultOffloader);
        } else {
            LedgerOffloader ledgerOffloader = this.offloaderMap.compute(namespaceName, (ns, offloader) -> {
                if (offloader != null && Objects.equals(offloader.getOffloadPolicies(), offloadPolicies)) {
                    return offloader;
                }
                if (offloader != null) {
                    offloader.close();
                }
                return this.initManagedLedgerOffloader(offloadPolicies, pulsarConnectorConfig);
            });
            managedLedgerConfig.setLedgerOffloader(ledgerOffloader);
        }
        return managedLedgerConfig;
    }

    private synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
        if (this.offloaderScheduler == null) {
            this.offloaderScheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads().intValue()).name("pulsar-offloader").build();
        }
        return this.offloaderScheduler;
    }

    private LedgerOffloader initManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies, PulsarConnectorConfig pulsarConnectorConfig) {
        try {
            if (StringUtils.isNotBlank((CharSequence)offloadPolicies.getManagedLedgerOffloadDriver())) {
                Preconditions.checkNotNull((Object)offloadPolicies.getOffloadersDirectory(), (String)"Offloader driver is configured to be '%s' but no offloaders directory is configured.", (Object)offloadPolicies.getManagedLedgerOffloadDriver());
                Offloaders offloaders = this.offloadersCache.getOrLoadOffloaders(offloadPolicies.getOffloadersDirectory(), pulsarConnectorConfig.getNarExtractionDirectory());
                LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(offloadPolicies.getManagedLedgerOffloadDriver());
                try {
                    return offloaderFactory.create(offloadPolicies, (Map<String, String>)ImmutableMap.of((Object)"S3ManagedLedgerOffloaderSoftwareVersion".toLowerCase(), (Object)PulsarVersion.getVersion(), (Object)"S3ManagedLedgerOffloaderSoftwareGitSha".toLowerCase(), (Object)PulsarVersion.getGitSha()), this.getOffloaderScheduler(offloadPolicies));
                }
                catch (IOException ioe) {
                    log.error("Failed to create offloader: ", new Object[]{ioe});
                    throw new RuntimeException(ioe.getMessage(), ioe.getCause());
                }
            }
            log.info("No ledger offloader configured, using NULL instance");
            return NullLedgerOffloader.INSTANCE;
        }
        catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }

    public ManagedLedgerFactory getManagedLedgerFactory() {
        return this.managedLedgerFactory;
    }

    public StatsProvider getStatsProvider() {
        return this.statsProvider;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void shutdown() throws Exception {
        Class<PulsarConnectorCache> clazz = PulsarConnectorCache.class;
        synchronized (PulsarConnectorCache.class) {
            if (instance != null) {
                PulsarConnectorCache.instance.statsProvider.stop();
                PulsarConnectorCache.instance.managedLedgerFactory.shutdown();
                PulsarConnectorCache.instance.metadataStore.close();
                PulsarConnectorCache.instance.offloaderScheduler.shutdown();
                PulsarConnectorCache.instance.offloadersCache.close();
            }
            // ** MonitorExit[var0] (shouldn't be in output)
            return;
        }
    }
}

