/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.batch.connectors.pulsar;

import java.io.IOException;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.batch.connectors.pulsar.ConnectorUtils;
import org.apache.flink.common.ConnectorConfig;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableMap;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.offload.OffloaderUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider;
import org.apache.pulsar.shade.org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CachedClients {
    private static final Logger log = LoggerFactory.getLogger(CachedClients.class);
    private static final String OFFLOADERS_DIRECTOR = "offloadersDirectory";
    private static final String MANAGED_LEDGER_OFFLOAD_DRIVER = "managedLedgerOffloadDriver";
    private static final String MANAGED_LEDGER_OFFLOAD_MAX_THREADS = "managedLedgerOffloadMaxThreads";
    static CachedClients instance;
    private final ManagedLedgerFactoryImpl managedLedgerFactory;
    private final StatsProvider statsProvider;
    private OrderedScheduler offloaderScheduler;
    private Offloaders offloaderManager;
    private LedgerOffloader offloader;

    private CachedClients(ConnectorConfig config) throws Exception {
        this.managedLedgerFactory = this.initManagedLedgerFactory(config);
        this.statsProvider = ConnectorUtils.createInstance(config.getStatsProvider(), StatsProvider.class, this.getClass().getClassLoader());
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        config.getStatsProviderConfigs().forEach((arg_0, arg_1) -> ((ClientConfiguration)clientConfiguration).setProperty(arg_0, arg_1));
        this.statsProvider.start((Configuration)clientConfiguration);
        this.offloader = this.initManagedLedgerOffloader(config);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static CachedClients getInstance(ConnectorConfig config) throws Exception {
        Class<CachedClients> clazz = CachedClients.class;
        synchronized (CachedClients.class) {
            if (instance == null) {
                instance = new CachedClients(config);
            }
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return instance;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void shutdown() throws Exception {
        Class<CachedClients> clazz = CachedClients.class;
        synchronized (CachedClients.class) {
            if (instance != null) {
                CachedClients.instance.statsProvider.stop();
                CachedClients.instance.managedLedgerFactory.shutdown();
                CachedClients.instance.offloaderScheduler.shutdown();
                CachedClients.instance.offloaderManager.close();
                instance = null;
            }
            // ** MonitorExit[var0] (shouldn't be in output)
            return;
        }
    }

    private ManagedLedgerFactoryImpl initManagedLedgerFactory(ConnectorConfig config) throws Exception {
        ClientConfiguration bkClientConfiguration = ((ClientConfiguration)((ClientConfiguration)((ClientConfiguration)((ClientConfiguration)new ClientConfiguration().setZkServers(config.getZookeeperUri())).setMetadataServiceUri("zk://" + config.getZookeeperUri() + "/ledgers")).setClientTcpNoDelay(false).setUseV2WireProtocol(true).setStickyReadsEnabled(false).setAllowShadedLedgerManagerFactoryClass(true)).setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.")).setReadEntryTimeout(60).setThrottleValue(config.getBookkeeperThrottleValue()).setNumIOThreads(config.getBookkeeperNumIOThreads()).setNumWorkerThreads(config.getBookkeeperNumWorkerThreads());
        ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
        managedLedgerFactoryConfig.setMaxCacheSize(config.getManagedLedgerCacheSizeMB());
        managedLedgerFactoryConfig.setNumManagedLedgerWorkerThreads(config.getManagedLedgerNumWorkerThreads());
        managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(config.getManagedLedgerNumSchedulerThreads());
        return new ManagedLedgerFactoryImpl(bkClientConfiguration, managedLedgerFactoryConfig);
    }

    private synchronized OrderedScheduler getOffloaderScheduler(ConnectorConfig config) {
        if (this.offloaderScheduler == null) {
            this.offloaderScheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().numThreads(config.getManagedLedgerOffloadMaxThreads()).name("pulsar-offloader").build();
        }
        return this.offloaderScheduler;
    }

    private LedgerOffloader initManagedLedgerOffloader(ConnectorConfig config) {
        try {
            if (StringUtils.isNotBlank((CharSequence)config.getManagedLedgerOffloadDriver())) {
                Preconditions.checkNotNull((Object)config.getOffloadersDirectory(), (String)"Offloader driver is configured to be '%s' but no offloaders directory is configured.", (Object[])new Object[]{config.getManagedLedgerOffloadDriver()});
                this.offloaderManager = OffloaderUtils.searchForOffloaders((String)config.getOffloadersDirectory(), (String)"");
                LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(config.getManagedLedgerOffloadDriver());
                Map<String, String> offloaderProperties = config.getOffloaderProperties();
                offloaderProperties.put(OFFLOADERS_DIRECTOR, config.getOffloadersDirectory());
                offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_DRIVER, config.getManagedLedgerOffloadDriver());
                offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(config.getManagedLedgerOffloadMaxThreads()));
                try {
                    return offloaderFactory.create(ConnectorUtils.getProperties(offloaderProperties), (Map)ImmutableMap.of((Object)"S3ManagedLedgerOffloaderSoftwareVersion".toLowerCase(), (Object)PulsarVersion.getVersion(), (Object)"S3ManagedLedgerOffloaderSoftwareGitSha".toLowerCase(), (Object)PulsarVersion.getGitSha()), this.getOffloaderScheduler(config));
                }
                catch (IOException ioe) {
                    log.error("Failed to create offloader: ", (Throwable)ioe);
                    throw new RuntimeException(ioe.getMessage(), ioe.getCause());
                }
            }
            log.info("No ledger offloader configured, using NULL instance");
            return NullLedgerOffloader.INSTANCE;
        }
        catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }

    public ManagedLedgerConfig getManagedLedgerConfig() {
        return new ManagedLedgerConfig().setLedgerOffloader(this.offloader);
    }

    public ManagedLedgerFactoryImpl getManagedLedgerFactory() {
        return this.managedLedgerFactory;
    }

    public StatsProvider getStatsProvider() {
        return this.statsProvider;
    }
}

