package com.twitter.distributedlog;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.distributedlog.DistributedLogManagerFactory;
import com.twitter.distributedlog.acl.AccessControlManager;
import com.twitter.distributedlog.acl.DefaultAccessControlManager;
import com.twitter.distributedlog.acl.ZKAccessControlManager;
import com.twitter.distributedlog.bk.LedgerAllocator;
import com.twitter.distributedlog.bk.LedgerAllocatorUtils;
import com.twitter.distributedlog.callback.NamespaceListener;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.feature.CoreFeatureKeys;
import com.twitter.distributedlog.impl.BKDLUtils;
import com.twitter.distributedlog.impl.ZKLogMetadataStore;
import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore;
import com.twitter.distributedlog.lock.SessionLockFactory;
import com.twitter.distributedlog.lock.ZKSessionLockFactory;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.metadata.BKDLConfig;
import com.twitter.distributedlog.metadata.LogMetadataStore;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
import com.twitter.distributedlog.util.ConfUtils;
import com.twitter.distributedlog.util.DLUtils;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.LimitedPermitManager;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.PermitLimiter;
import com.twitter.distributedlog.util.PermitManager;
import com.twitter.distributedlog.util.SchedulerUtils;
import com.twitter.distributedlog.util.SimplePermitLimiter;
import com.twitter.distributedlog.util.Utils;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.Stat;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/twitter/distributedlog/BKDistributedLogNamespace.class */
public class BKDistributedLogNamespace implements DistributedLogNamespace {
    static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogNamespace.class);
    private final String clientId;
    private final int regionId;
    private final DistributedLogConfiguration conf;
    private final URI namespace;
    private final BKDLConfig bkdlConfig;
    private final OrderedScheduler scheduler;
    private final OrderedScheduler readAheadExecutor;
    private final OrderedScheduler lockStateExecutor;
    private final ClientSocketChannelFactory channelFactory;
    private final HashedWheelTimer requestTimer;
    private final ZooKeeperClientBuilder sharedWriterZKCBuilderForDL;
    private final ZooKeeperClient sharedWriterZKCForDL;
    private final ZooKeeperClientBuilder sharedReaderZKCBuilderForDL;
    private final ZooKeeperClient sharedReaderZKCForDL;
    private ZooKeeperClientBuilder sharedWriterZKCBuilderForBK;
    private ZooKeeperClient sharedWriterZKCForBK;
    private ZooKeeperClientBuilder sharedReaderZKCBuilderForBK;
    private ZooKeeperClient sharedReaderZKCForBK;
    private final BookKeeperClientBuilder sharedWriterBKCBuilder;
    private final BookKeeperClient writerBKC;
    private final BookKeeperClientBuilder sharedReaderBKCBuilder;
    private final BookKeeperClient readerBKC;
    private final LedgerAllocator allocator;
    private AccessControlManager accessControlManager;
    private final PermitManager logSegmentRollingPermitManager;
    private final LogMetadataStore metadataStore;
    private final LogSegmentMetadataStore writerSegmentMetadataStore;
    private final LogSegmentMetadataStore readerSegmentMetadataStore;
    private final SessionLockFactory lockFactory;
    private final FeatureProvider featureProvider;
    private final StatsLogger statsLogger;
    private final StatsLogger perLogStatsLogger;
    private final ReadAheadExceptionsLogger readAheadExceptionsLogger;
    protected boolean closed;
    private final PermitLimiter writeLimiter;

    /* loaded from: input_file:com/twitter/distributedlog/BKDistributedLogNamespace$Builder.class */
    public static class Builder {
        private DistributedLogConfiguration _conf;
        private URI _uri;
        private StatsLogger _statsLogger;
        private StatsLogger _perLogStatsLogger;
        private FeatureProvider _featureProvider;
        private String _clientId;
        private int _regionId;

        private Builder() {
            this._conf = null;
            this._uri = null;
            this._statsLogger = NullStatsLogger.INSTANCE;
            this._perLogStatsLogger = NullStatsLogger.INSTANCE;
            this._featureProvider = new SettableFeatureProvider("", 0);
            this._clientId = DistributedLogConstants.UNKNOWN_CLIENT_ID;
            this._regionId = 0;
        }

        public Builder conf(DistributedLogConfiguration distributedLogConfiguration) {
            this._conf = distributedLogConfiguration;
            return this;
        }

        public Builder uri(URI uri) {
            this._uri = uri;
            return this;
        }

        public Builder statsLogger(StatsLogger statsLogger) {
            this._statsLogger = statsLogger;
            return this;
        }

        public Builder perLogStatsLogger(StatsLogger statsLogger) {
            this._perLogStatsLogger = statsLogger;
            return this;
        }

        public Builder featureProvider(FeatureProvider featureProvider) {
            this._featureProvider = featureProvider;
            return this;
        }

        public Builder clientId(String str) {
            this._clientId = str;
            return this;
        }

        public Builder regionId(int i) {
            this._regionId = i;
            return this;
        }

        public BKDistributedLogNamespace build() throws IOException, NullPointerException, IllegalArgumentException {
            Preconditions.checkNotNull(this._conf, "No DistributedLog Configuration");
            Preconditions.checkNotNull(this._uri, "No DistributedLog URI");
            Preconditions.checkNotNull(this._featureProvider, "No Feature Provider");
            Preconditions.checkNotNull(this._statsLogger, "No Stats Logger");
            Preconditions.checkNotNull(this._featureProvider, "No Feature Provider");
            Preconditions.checkNotNull(this._clientId, "No Client ID");
            BKDLUtils.validateConfAndURI(this._conf, this._uri);
            ZooKeeperClientBuilder createDLZKClientBuilder = BKDistributedLogNamespace.createDLZKClientBuilder(String.format("dlzk:%s:factory_writer_shared", this._uri), this._conf, DLUtils.getZKServersFromDLUri(this._uri), this._statsLogger.scope("dlzk_factory_writer_shared"));
            ZooKeeperClient build = createDLZKClientBuilder.build();
            BKDLConfig resolveDLConfig = BKDLConfig.resolveDLConfig(build, this._uri);
            StatsLogger statsLogger = this._perLogStatsLogger;
            if (statsLogger == NullStatsLogger.INSTANCE && this._conf.getEnablePerStreamStat()) {
                statsLogger = this._statsLogger.scope("stream");
            }
            return new BKDistributedLogNamespace(this._conf, this._uri, this._featureProvider, this._statsLogger, statsLogger, this._clientId, this._regionId, createDLZKClientBuilder, build, resolveDLConfig);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/BKDistributedLogNamespace$ZooKeeperClientHandler.class */
    public interface ZooKeeperClientHandler<T> {
        T handle(ZooKeeperClient zooKeeperClient) throws IOException;
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private static <T> T withZooKeeperClient(ZooKeeperClientHandler<T> zooKeeperClientHandler, DistributedLogConfiguration distributedLogConfiguration, URI uri) throws IOException {
        ZooKeeperClient build = ZooKeeperClientBuilder.newBuilder().name(String.format("dlzk:%s:factory_static", uri)).sessionTimeoutMs(distributedLogConfiguration.getZKSessionTimeoutMilliseconds()).uri(uri).retryThreadCount(distributedLogConfiguration.getZKClientNumberRetryThreads()).requestRateLimit(distributedLogConfiguration.getZKRequestRateLimit()).zkAclId(distributedLogConfiguration.getZkAclId()).build();
        try {
            T handle = zooKeeperClientHandler.handle(build);
            build.close();
            return handle;
        } catch (Throwable th) {
            build.close();
            throw th;
        }
    }

    private BKDistributedLogNamespace(DistributedLogConfiguration distributedLogConfiguration, URI uri, FeatureProvider featureProvider, StatsLogger statsLogger, StatsLogger statsLogger2, String str, int i, ZooKeeperClientBuilder zooKeeperClientBuilder, ZooKeeperClient zooKeeperClient, BKDLConfig bKDLConfig) throws IOException, IllegalArgumentException {
        this.sharedWriterZKCBuilderForBK = null;
        this.sharedWriterZKCForBK = null;
        this.sharedReaderZKCBuilderForBK = null;
        this.sharedReaderZKCForBK = null;
        this.closed = false;
        this.conf = distributedLogConfiguration;
        this.namespace = uri;
        this.featureProvider = featureProvider;
        this.statsLogger = statsLogger;
        this.perLogStatsLogger = statsLogger2;
        this.clientId = str;
        this.regionId = i;
        this.bkdlConfig = bKDLConfig;
        StatsLogger scope = statsLogger.scope("factory").scope("thread_pool");
        this.scheduler = OrderedScheduler.newBuilder().name("DLM-" + uri.getPath()).corePoolSize(distributedLogConfiguration.getNumWorkerThreads()).statsLogger(scope).perExecutorStatsLogger(scope).traceTaskExecution(distributedLogConfiguration.getEnableTaskExecutionStats()).traceTaskExecutionWarnTimeUs(distributedLogConfiguration.getTaskExecutionWarnTimeMicros()).build();
        if (distributedLogConfiguration.getNumReadAheadWorkerThreads() > 0) {
            this.readAheadExecutor = OrderedScheduler.newBuilder().name("DLM-" + uri.getPath() + "-readahead-executor").corePoolSize(distributedLogConfiguration.getNumReadAheadWorkerThreads()).statsLogger(statsLogger.scope("factory").scope("readahead_thread_pool")).traceTaskExecution(distributedLogConfiguration.getTraceReadAheadDeliveryLatency()).traceTaskExecutionWarnTimeUs(distributedLogConfiguration.getTaskExecutionWarnTimeMicros()).build();
            LOG.info("Created dedicated readahead executor : threads = {}", Integer.valueOf(distributedLogConfiguration.getNumReadAheadWorkerThreads()));
        } else {
            this.readAheadExecutor = this.scheduler;
            LOG.info("Used shared executor for readahead.");
        }
        StatsLogger scope2 = statsLogger.scope("factory").scope("lock_scheduler");
        this.lockStateExecutor = OrderedScheduler.newBuilder().name("DLM-LockState").corePoolSize(distributedLogConfiguration.getNumLockStateThreads()).statsLogger(scope2).perExecutorStatsLogger(scope2).traceTaskExecution(distributedLogConfiguration.getEnableTaskExecutionStats()).traceTaskExecutionWarnTimeUs(distributedLogConfiguration.getTaskExecutionWarnTimeMicros()).build();
        this.channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()), Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()), distributedLogConfiguration.getBKClientNumberIOThreads());
        this.requestTimer = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("DLFactoryTimer-%d").build(), distributedLogConfiguration.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, distributedLogConfiguration.getTimeoutTimerNumTicks());
        this.sharedWriterZKCBuilderForDL = zooKeeperClientBuilder;
        this.sharedWriterZKCForDL = zooKeeperClient;
        if (bKDLConfig.getDlZkServersForWriter().equals(bKDLConfig.getDlZkServersForReader())) {
            this.sharedReaderZKCBuilderForDL = this.sharedWriterZKCBuilderForDL;
        } else {
            this.sharedReaderZKCBuilderForDL = createDLZKClientBuilder(String.format("dlzk:%s:factory_reader_shared", this.namespace), distributedLogConfiguration, bKDLConfig.getDlZkServersForReader(), statsLogger.scope("dlzk_factory_reader_shared"));
        }
        this.sharedReaderZKCForDL = this.sharedReaderZKCBuilderForDL.build();
        this.sharedWriterBKCBuilder = createBKCBuilder(String.format("bk:%s:factory_writer_shared", this.namespace), distributedLogConfiguration, bKDLConfig.getBkZkServersForWriter(), bKDLConfig.getBkLedgersPath(), Optional.of(featureProvider.scope("bkc")));
        this.writerBKC = this.sharedWriterBKCBuilder.build();
        if (bKDLConfig.getBkZkServersForWriter().equals(bKDLConfig.getBkZkServersForReader())) {
            this.sharedReaderBKCBuilder = this.sharedWriterBKCBuilder;
        } else {
            this.sharedReaderBKCBuilder = createBKCBuilder(String.format("bk:%s:factory_reader_shared", this.namespace), distributedLogConfiguration, bKDLConfig.getBkZkServersForReader(), bKDLConfig.getBkLedgersPath(), Optional.absent());
        }
        this.readerBKC = this.sharedReaderBKCBuilder.build();
        this.logSegmentRollingPermitManager = new LimitedPermitManager(distributedLogConfiguration.getLogSegmentRollingConcurrency(), 1, TimeUnit.MINUTES, this.scheduler);
        if (distributedLogConfiguration.getGlobalOutstandingWriteLimit() < 0) {
            this.writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
        } else {
            this.writeLimiter = new SimplePermitLimiter(distributedLogConfiguration.getOutstandingWriteLimitDarkmode(), distributedLogConfiguration.getGlobalOutstandingWriteLimit(), statsLogger.scope("writeLimiter"), true, featureProvider.getFeature(CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase()));
        }
        BKDLConfig.propagateConfiguration(bKDLConfig, distributedLogConfiguration);
        if (distributedLogConfiguration.getEnableLedgerAllocatorPool()) {
            String validateAndGetFullLedgerAllocatorPoolPath = validateAndGetFullLedgerAllocatorPoolPath(distributedLogConfiguration, uri);
            this.allocator = LedgerAllocatorUtils.createLedgerAllocatorPool(validateAndGetFullLedgerAllocatorPoolPath, distributedLogConfiguration.getLedgerAllocatorPoolCoreSize(), distributedLogConfiguration, this.sharedWriterZKCForDL, this.writerBKC, this.scheduler);
            if (null != this.allocator) {
                this.allocator.start();
            }
            LOG.info("Created ledger allocator pool under {} with size {}.", validateAndGetFullLedgerAllocatorPoolPath, Integer.valueOf(distributedLogConfiguration.getLedgerAllocatorPoolCoreSize()));
        } else {
            this.allocator = null;
        }
        this.lockFactory = new ZKSessionLockFactory(this.sharedWriterZKCForDL, str, this.lockStateExecutor, distributedLogConfiguration.getZKNumRetries(), distributedLogConfiguration.getLockTimeoutMilliSeconds(), distributedLogConfiguration.getZKRetryBackoffStartMillis(), statsLogger);
        this.readAheadExceptionsLogger = new ReadAheadExceptionsLogger(statsLogger);
        if (bKDLConfig.isFederatedNamespace() || distributedLogConfiguration.isFederatedNamespaceEnabled()) {
            this.metadataStore = new FederatedZKLogMetadataStore(distributedLogConfiguration, this.namespace, this.sharedReaderZKCForDL, this.scheduler);
        } else {
            this.metadataStore = new ZKLogMetadataStore(distributedLogConfiguration, this.namespace, this.sharedReaderZKCForDL, this.scheduler);
        }
        this.writerSegmentMetadataStore = new ZKLogSegmentMetadataStore(distributedLogConfiguration, this.sharedWriterZKCForDL, this.scheduler);
        this.readerSegmentMetadataStore = new ZKLogSegmentMetadataStore(distributedLogConfiguration, this.sharedReaderZKCForDL, this.scheduler);
        LOG.info("Constructed BK DistributedLogNamespace : clientId = {}, regionId = {}, federated = {}.", new Object[]{str, Integer.valueOf(i), Boolean.valueOf(bKDLConfig.isFederatedNamespace())});
    }

    @Override // com.twitter.distributedlog.namespace.DistributedLogNamespace
    public void createLog(String str) throws InvalidStreamNameException, IOException {
        BKDLUtils.validateName(str);
        createUnpartitionedStreams(this.conf, (URI) FutureUtils.result(this.metadataStore.createLog(str)), Lists.newArrayList(new String[]{str}));
    }

    @Override // com.twitter.distributedlog.namespace.DistributedLogNamespace
    public void deleteLog(String str) throws InvalidStreamNameException, LogNotFoundException, IOException {
        BKDLUtils.validateName(str);
        Optional optional = (Optional) FutureUtils.result(this.metadataStore.getLogLocation(str));
        if (!optional.isPresent()) {
            throw new LogNotFoundException("Log " + str + " isn't found.");
        }
        createDistributedLogManager((URI) optional.get(), str, DistributedLogManagerFactory.ClientSharingOption.SharedClients, Optional.absent(), Optional.absent()).delete();
    }

    @Override // com.twitter.distributedlog.namespace.DistributedLogNamespace
    public DistributedLogManager openLog(String str) throws InvalidStreamNameException, IOException {
        return openLog(str, Optional.absent(), Optional.absent());
    }

    @Override // com.twitter.distributedlog.namespace.DistributedLogNamespace
    public DistributedLogManager openLog(String str, Optional<DistributedLogConfiguration> optional, Optional<DynamicDistributedLogConfiguration> optional2) throws InvalidStreamNameException, IOException {
        BKDLUtils.validateName(str);
        Optional optional3 = (Optional) FutureUtils.result(this.metadataStore.getLogLocation(str));
        if (optional3.isPresent()) {
            return createDistributedLogManager((URI) optional3.get(), str, DistributedLogManagerFactory.ClientSharingOption.SharedClients, optional, optional2);
        }
        throw new LogNotFoundException("Log " + str + " isn't found.");
    }

    @Override // com.twitter.distributedlog.namespace.DistributedLogNamespace
    public boolean logExists(String str) throws IOException, IllegalArgumentException {
        Optional optional = (Optional) FutureUtils.result(this.metadataStore.getLogLocation(str));
        return optional.isPresent() && checkIfLogExists(this.conf, (URI) optional.get(), str);
    }

    @Override // com.twitter.distributedlog.namespace.DistributedLogNamespace
    public Iterator<String> getLogs() throws IOException {
        return (Iterator) FutureUtils.result(this.metadataStore.getLogs());
    }

    @Override // com.twitter.distributedlog.namespace.DistributedLogNamespace
    public void registerNamespaceListener(NamespaceListener namespaceListener) {
        this.metadataStore.registerNamespaceListener(namespaceListener);
    }

    @Override // com.twitter.distributedlog.namespace.DistributedLogNamespace
    public synchronized AccessControlManager createAccessControlManager() throws IOException {
        if (null == this.accessControlManager) {
            String aCLRootPath = this.bkdlConfig.getACLRootPath();
            if (aCLRootPath == null) {
                this.accessControlManager = DefaultAccessControlManager.INSTANCE;
                LOG.info("Created default access control manager for {}", this.namespace);
            } else {
                if (!BKDLUtils.isReservedStreamName(aCLRootPath)) {
                    throw new IOException("Invalid Access Control List Root Path : " + aCLRootPath);
                }
                String str = this.namespace.getPath() + "/" + aCLRootPath;
                LOG.info("Creating zk based access control manager @ {} for {}", str, this.namespace);
                this.accessControlManager = new ZKAccessControlManager(this.conf, this.sharedReaderZKCForDL, str, this.scheduler);
                LOG.info("Created zk based access control manager @ {} for {}", str, this.namespace);
            }
        }
        return this.accessControlManager;
    }

    static String validateAndGetFullLedgerAllocatorPoolPath(DistributedLogConfiguration distributedLogConfiguration, URI uri) throws IOException {
        String ledgerAllocatorPoolPath = distributedLogConfiguration.getLedgerAllocatorPoolPath();
        LOG.info("PoolPath is {}", ledgerAllocatorPoolPath);
        if (null == ledgerAllocatorPoolPath || !ledgerAllocatorPoolPath.startsWith(".") || ledgerAllocatorPoolPath.endsWith("/")) {
            LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", ledgerAllocatorPoolPath);
            throw new IOException("Invalid ledger allocator pool path specified : " + ledgerAllocatorPoolPath);
        }
        String ledgerAllocatorPoolName = distributedLogConfiguration.getLedgerAllocatorPoolName();
        if (null == ledgerAllocatorPoolName) {
            LOG.error("No ledger allocator pool name specified when enabling ledger allocator pool.");
            throw new IOException("No ledger allocator name specified when enabling ledger allocator pool.");
        }
        String str = uri.getPath() + "/" + ledgerAllocatorPoolPath + "/" + ledgerAllocatorPoolName;
        try {
            PathUtils.validatePath(str);
            return str;
        } catch (IllegalArgumentException e) {
            LOG.error("Invalid ledger allocator pool path specified when enabling ledger allocator pool : {}", ledgerAllocatorPoolPath);
            throw new IOException("Invalid ledger allocator pool path specified : " + ledgerAllocatorPoolPath);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ZooKeeperClientBuilder createDLZKClientBuilder(String str, DistributedLogConfiguration distributedLogConfiguration, String str2, StatsLogger statsLogger) {
        BoundExponentialBackoffRetryPolicy boundExponentialBackoffRetryPolicy = null;
        if (distributedLogConfiguration.getZKNumRetries() > 0) {
            boundExponentialBackoffRetryPolicy = new BoundExponentialBackoffRetryPolicy(distributedLogConfiguration.getZKRetryBackoffStartMillis(), distributedLogConfiguration.getZKRetryBackoffMaxMillis(), distributedLogConfiguration.getZKNumRetries());
        }
        ZooKeeperClientBuilder zkAclId = ZooKeeperClientBuilder.newBuilder().name(str).sessionTimeoutMs(distributedLogConfiguration.getZKSessionTimeoutMilliseconds()).retryThreadCount(distributedLogConfiguration.getZKClientNumberRetryThreads()).requestRateLimit(distributedLogConfiguration.getZKRequestRateLimit()).zkServers(str2).retryPolicy(boundExponentialBackoffRetryPolicy).statsLogger(statsLogger).zkAclId(distributedLogConfiguration.getZkAclId());
        LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, numRetries = {}, sessionTimeout = {}, retryBackoff = {}, maxRetryBackoff = {}, zkAclId = {}.", new Object[]{str, str2, Integer.valueOf(distributedLogConfiguration.getZKNumRetries()), Integer.valueOf(distributedLogConfiguration.getZKSessionTimeoutMilliseconds()), Integer.valueOf(distributedLogConfiguration.getZKRetryBackoffStartMillis()), Integer.valueOf(distributedLogConfiguration.getZKRetryBackoffMaxMillis()), distributedLogConfiguration.getZkAclId()});
        return zkAclId;
    }

    private static ZooKeeperClientBuilder createBKZKClientBuilder(String str, DistributedLogConfiguration distributedLogConfiguration, String str2, StatsLogger statsLogger) {
        BoundExponentialBackoffRetryPolicy boundExponentialBackoffRetryPolicy = null;
        if (distributedLogConfiguration.getZKNumRetries() > 0) {
            boundExponentialBackoffRetryPolicy = new BoundExponentialBackoffRetryPolicy(distributedLogConfiguration.getBKClientZKRetryBackoffStartMillis(), distributedLogConfiguration.getBKClientZKRetryBackoffMaxMillis(), distributedLogConfiguration.getBKClientZKNumRetries());
        }
        ZooKeeperClientBuilder zkAclId = ZooKeeperClientBuilder.newBuilder().name(str).sessionTimeoutMs(distributedLogConfiguration.getBKClientZKSessionTimeoutMilliSeconds()).retryThreadCount(distributedLogConfiguration.getZKClientNumberRetryThreads()).requestRateLimit(distributedLogConfiguration.getBKClientZKRequestRateLimit()).zkServers(str2).retryPolicy(boundExponentialBackoffRetryPolicy).statsLogger(statsLogger).zkAclId(distributedLogConfiguration.getZkAclId());
        LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, numRetries = {}, sessionTimeout = {}, retryBackoff = {}, maxRetryBackoff = {}, zkAclId = {}.", new Object[]{str, str2, Integer.valueOf(distributedLogConfiguration.getBKClientZKNumRetries()), Integer.valueOf(distributedLogConfiguration.getBKClientZKSessionTimeoutMilliSeconds()), Integer.valueOf(distributedLogConfiguration.getBKClientZKRetryBackoffStartMillis()), Integer.valueOf(distributedLogConfiguration.getBKClientZKRetryBackoffMaxMillis()), distributedLogConfiguration.getZkAclId()});
        return zkAclId;
    }

    private BookKeeperClientBuilder createBKCBuilder(String str, DistributedLogConfiguration distributedLogConfiguration, String str2, String str3, Optional<FeatureProvider> optional) {
        BookKeeperClientBuilder statsLogger = BookKeeperClientBuilder.newBuilder().name(str).dlConfig(distributedLogConfiguration).zkServers(str2).ledgersPath(str3).channelFactory(this.channelFactory).requestTimer(this.requestTimer).featureProvider(optional).statsLogger(this.statsLogger);
        LOG.info("Created shared client builder {} : zkServers = {}, ledgersPath = {}, numIOThreads = {}", new Object[]{str, str2, str3, Integer.valueOf(distributedLogConfiguration.getBKClientNumberIOThreads())});
        return statsLogger;
    }

    @VisibleForTesting
    public ZooKeeperClient getSharedWriterZKCForDL() {
        return this.sharedWriterZKCForDL;
    }

    @VisibleForTesting
    public BookKeeperClient getReaderBKC() {
        return this.readerBKC;
    }

    @VisibleForTesting
    public LogSegmentMetadataStore getWriterSegmentMetadataStore() {
        return this.writerSegmentMetadataStore;
    }

    @VisibleForTesting
    public LedgerAllocator getLedgerAllocator() {
        return this.allocator;
    }

    private <T> T withZooKeeperClient(ZooKeeperClientHandler<T> zooKeeperClientHandler) throws IOException {
        return zooKeeperClientHandler.handle(this.sharedWriterZKCForDL);
    }

    public DistributedLogManager createDistributedLogManagerWithSharedClients(String str) throws InvalidStreamNameException, IOException {
        return createDistributedLogManager(str, DistributedLogManagerFactory.ClientSharingOption.SharedClients);
    }

    public DistributedLogManager createDistributedLogManager(String str, DistributedLogManagerFactory.ClientSharingOption clientSharingOption) throws InvalidStreamNameException, IOException {
        return createDistributedLogManager(str, clientSharingOption, Optional.absent(), Optional.absent());
    }

    public DistributedLogManager createDistributedLogManager(String str, DistributedLogManagerFactory.ClientSharingOption clientSharingOption, Optional<DistributedLogConfiguration> optional, Optional<DynamicDistributedLogConfiguration> optional2) throws InvalidStreamNameException, IOException {
        if (this.bkdlConfig.isFederatedNamespace()) {
            throw new UnsupportedOperationException("Use DistributedLogNamespace methods for federated namespace");
        }
        return createDistributedLogManager(this.namespace, str, clientSharingOption, optional, optional2);
    }

    protected DistributedLogManager createDistributedLogManager(URI uri, String str, DistributedLogManagerFactory.ClientSharingOption clientSharingOption, Optional<DistributedLogConfiguration> optional, Optional<DynamicDistributedLogConfiguration> optional2) throws InvalidStreamNameException, IOException {
        BKDLUtils.validateName(str);
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.conf);
        distributedLogConfiguration.loadStreamConf(optional);
        DynamicDistributedLogConfiguration constDynConf = optional2.isPresent() ? (DynamicDistributedLogConfiguration) optional2.get() : ConfUtils.getConstDynConf(distributedLogConfiguration);
        ZooKeeperClientBuilder zooKeeperClientBuilder = null;
        ZooKeeperClientBuilder zooKeeperClientBuilder2 = null;
        ZooKeeperClient zooKeeperClient = null;
        ZooKeeperClient zooKeeperClient2 = null;
        BookKeeperClientBuilder bookKeeperClientBuilder = null;
        BookKeeperClientBuilder bookKeeperClientBuilder2 = null;
        switch (clientSharingOption) {
            case SharedClients:
                zooKeeperClientBuilder = this.sharedWriterZKCBuilderForDL;
                zooKeeperClientBuilder2 = this.sharedReaderZKCBuilderForDL;
                bookKeeperClientBuilder = this.sharedWriterBKCBuilder;
                bookKeeperClientBuilder2 = this.sharedReaderBKCBuilder;
                break;
            case SharedZKClientPerStreamBKClient:
                zooKeeperClientBuilder = this.sharedWriterZKCBuilderForDL;
                zooKeeperClientBuilder2 = this.sharedReaderZKCBuilderForDL;
                synchronized (this) {
                    if (null == this.sharedWriterZKCForBK) {
                        this.sharedWriterZKCBuilderForBK = createBKZKClientBuilder(String.format("bkzk:%s:factory_writer_shared", uri), distributedLogConfiguration, this.bkdlConfig.getBkZkServersForWriter(), this.statsLogger.scope("bkzk_factory_writer_shared"));
                        this.sharedWriterZKCForBK = this.sharedWriterZKCBuilderForBK.build();
                    }
                    if (null == this.sharedReaderZKCForBK) {
                        if (this.bkdlConfig.getBkZkServersForWriter().equals(this.bkdlConfig.getBkZkServersForReader())) {
                            this.sharedReaderZKCBuilderForBK = this.sharedWriterZKCBuilderForBK;
                        } else {
                            this.sharedReaderZKCBuilderForBK = createBKZKClientBuilder(String.format("bkzk:%s:factory_reader_shared", uri), distributedLogConfiguration, this.bkdlConfig.getBkZkServersForReader(), this.statsLogger.scope("bkzk_factory_reader_shared"));
                        }
                        this.sharedReaderZKCForBK = this.sharedReaderZKCBuilderForBK.build();
                    }
                    zooKeeperClient = this.sharedWriterZKCForBK;
                    zooKeeperClient2 = this.sharedReaderZKCForBK;
                }
                break;
        }
        LedgerAllocator ledgerAllocator = null;
        PermitManager permitManager = PermitManager.UNLIMITED_PERMIT_MANAGER;
        if (DistributedLogManagerFactory.ClientSharingOption.SharedClients == clientSharingOption) {
            ledgerAllocator = this.allocator;
            permitManager = this.logSegmentRollingPermitManager;
        }
        return new BKDistributedLogManager(str, distributedLogConfiguration, constDynConf, uri, zooKeeperClientBuilder, zooKeeperClientBuilder2, zooKeeperClient, zooKeeperClient2, bookKeeperClientBuilder, bookKeeperClientBuilder2, this.lockFactory, this.writerSegmentMetadataStore, this.readerSegmentMetadataStore, this.scheduler, this.readAheadExecutor, this.lockStateExecutor, this.channelFactory, this.requestTimer, this.readAheadExceptionsLogger, this.clientId, Integer.valueOf(this.regionId), ledgerAllocator, this.writeLimiter, permitManager, this.featureProvider.scope("dl"), this.statsLogger, this.perLogStatsLogger);
    }

    public MetadataAccessor createMetadataAccessor(String str) throws InvalidStreamNameException, IOException {
        if (this.bkdlConfig.isFederatedNamespace()) {
            throw new UnsupportedOperationException("Use DistributedLogNamespace methods for federated namespace");
        }
        BKDLUtils.validateName(str);
        return new ZKMetadataAccessor(str, this.conf, this.namespace, this.sharedWriterZKCBuilderForDL, this.sharedReaderZKCBuilderForDL, this.statsLogger);
    }

    public Collection<String> enumerateAllLogsInNamespace() throws IOException, IllegalArgumentException {
        if (this.bkdlConfig.isFederatedNamespace()) {
            throw new UnsupportedOperationException("Use DistributedLogNamespace methods for federated namespace");
        }
        return Sets.newHashSet(getLogs());
    }

    public Map<String, byte[]> enumerateLogsWithMetadataInNamespace() throws IOException, IllegalArgumentException {
        if (this.bkdlConfig.isFederatedNamespace()) {
            throw new UnsupportedOperationException("Use DistributedLogNamespace methods for federated namespace");
        }
        return (Map) withZooKeeperClient(new ZooKeeperClientHandler<Map<String, byte[]>>() { // from class: com.twitter.distributedlog.BKDistributedLogNamespace.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.twitter.distributedlog.BKDistributedLogNamespace.ZooKeeperClientHandler
            public Map<String, byte[]> handle(ZooKeeperClient zooKeeperClient) throws IOException {
                return BKDistributedLogNamespace.enumerateLogsWithMetadataInternal(zooKeeperClient, BKDistributedLogNamespace.this.conf, BKDistributedLogNamespace.this.namespace);
            }
        });
    }

    private static void validateInput(DistributedLogConfiguration distributedLogConfiguration, URI uri, String str) throws IllegalArgumentException, InvalidStreamNameException {
        BKDLUtils.validateConfAndURI(distributedLogConfiguration, uri);
        BKDLUtils.validateName(str);
    }

    private static boolean checkIfLogExists(DistributedLogConfiguration distributedLogConfiguration, URI uri, String str) throws IOException, IllegalArgumentException {
        validateInput(distributedLogConfiguration, uri, str);
        final String str2 = uri.getPath() + String.format("/%s", str);
        return ((Boolean) withZooKeeperClient(new ZooKeeperClientHandler<Boolean>() { // from class: com.twitter.distributedlog.BKDistributedLogNamespace.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.twitter.distributedlog.BKDistributedLogNamespace.ZooKeeperClientHandler
            public Boolean handle(ZooKeeperClient zooKeeperClient) throws IOException {
                try {
                    return Boolean.valueOf(null != Utils.sync(zooKeeperClient, str2).exists(str2, false));
                } catch (InterruptedException e) {
                    throw new DLInterruptedException("Interrupted on checking if log " + str2 + " exists", e);
                } catch (KeeperException e2) {
                    throw new ZKException("Error on checking if log " + str2 + " exists", e2.code());
                }
            }
        }, distributedLogConfiguration, uri)).booleanValue();
    }

    public static Map<String, byte[]> enumerateLogsWithMetadataInNamespace(final DistributedLogConfiguration distributedLogConfiguration, final URI uri) throws IOException, IllegalArgumentException {
        return (Map) withZooKeeperClient(new ZooKeeperClientHandler<Map<String, byte[]>>() { // from class: com.twitter.distributedlog.BKDistributedLogNamespace.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.twitter.distributedlog.BKDistributedLogNamespace.ZooKeeperClientHandler
            public Map<String, byte[]> handle(ZooKeeperClient zooKeeperClient) throws IOException {
                return BKDistributedLogNamespace.enumerateLogsWithMetadataInternal(zooKeeperClient, DistributedLogConfiguration.this, uri);
            }
        }, distributedLogConfiguration, uri);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<String, byte[]> enumerateLogsWithMetadataInternal(ZooKeeperClient zooKeeperClient, DistributedLogConfiguration distributedLogConfiguration, URI uri) throws IOException, IllegalArgumentException {
        BKDLUtils.validateConfAndURI(distributedLogConfiguration, uri);
        String path = uri.getPath();
        HashMap hashMap = new HashMap();
        try {
            ZooKeeper sync = Utils.sync(zooKeeperClient, path);
            if (sync.exists(path, false) == null) {
                return hashMap;
            }
            for (String str : sync.getChildren(path, false)) {
                if (!BKDLUtils.isReservedStreamName(str)) {
                    String format = String.format("%s/%s", path, str);
                    Stat exists = sync.exists(format, false);
                    if (exists == null) {
                        hashMap.put(str, new byte[0]);
                    } else {
                        hashMap.put(str, sync.getData(format, false, exists));
                    }
                }
            }
            return hashMap;
        } catch (InterruptedException e) {
            LOG.error("Interrupted while deleting " + path, e);
            throw new IOException("Interrupted while reading " + path, e);
        } catch (KeeperException e2) {
            LOG.error("Error reading" + path + "entry in zookeeper", e2);
            throw new IOException("Error reading" + path + "entry in zookeeper", e2);
        }
    }

    private static void createUnpartitionedStreams(final DistributedLogConfiguration distributedLogConfiguration, final URI uri, final List<String> list) throws IOException, IllegalArgumentException {
        withZooKeeperClient(new ZooKeeperClientHandler<Void>() { // from class: com.twitter.distributedlog.BKDistributedLogNamespace.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.twitter.distributedlog.BKDistributedLogNamespace.ZooKeeperClientHandler
            public Void handle(ZooKeeperClient zooKeeperClient) throws IOException {
                for (String str : list) {
                    try {
                        BKDistributedLogManager.createLog(distributedLogConfiguration, zooKeeperClient, uri, str);
                    } catch (InterruptedException e) {
                        BKDistributedLogNamespace.LOG.error("Interrupted on creating unpartitioned stream {} : ", str, e);
                        return null;
                    }
                }
                return null;
            }
        }, distributedLogConfiguration, uri);
    }

    @Override // com.twitter.distributedlog.namespace.DistributedLogNamespace
    public void close() {
        synchronized (this) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            ZooKeeperClient zooKeeperClient = this.sharedWriterZKCForBK;
            ZooKeeperClient zooKeeperClient2 = this.sharedReaderZKCForBK;
            AccessControlManager accessControlManager = this.accessControlManager;
            if (null != accessControlManager) {
                accessControlManager.close();
                LOG.info("Access Control Manager Stopped.");
            }
            if (null != this.allocator) {
                Utils.closeQuietly(this.allocator);
                LOG.info("Ledger Allocator stopped.");
            }
            Utils.close(this.writerSegmentMetadataStore);
            Utils.close(this.readerSegmentMetadataStore);
            SchedulerUtils.shutdownScheduler(this.scheduler, this.conf.getSchedulerShutdownTimeoutMs(), TimeUnit.MILLISECONDS);
            LOG.info("Executor Service Stopped.");
            if (this.scheduler != this.readAheadExecutor) {
                SchedulerUtils.shutdownScheduler(this.readAheadExecutor, this.conf.getSchedulerShutdownTimeoutMs(), TimeUnit.MILLISECONDS);
                LOG.info("ReadAhead Executor Service Stopped.");
            }
            this.writerBKC.close();
            this.readerBKC.close();
            this.sharedWriterZKCForDL.close();
            this.sharedReaderZKCForDL.close();
            if (null != zooKeeperClient) {
                zooKeeperClient.close();
            }
            if (null != zooKeeperClient2) {
                zooKeeperClient2.close();
            }
            this.channelFactory.releaseExternalResources();
            LOG.info("Release external resources used by channel factory.");
            this.requestTimer.stop();
            LOG.info("Stopped request timer");
            SchedulerUtils.shutdownScheduler(this.lockStateExecutor, 5000L, TimeUnit.MILLISECONDS);
            LOG.info("Stopped lock state executor");
        }
    }
}
