package co.cask.cdap.common.guice;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.conf.KafkaConstants;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.PrivateModule;
import com.google.inject.Provider;
import com.google.inject.Scopes;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.twill.common.Cancellable;
import org.apache.twill.internal.kafka.client.ZKBrokerService;
import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
import org.apache.twill.kafka.client.BrokerInfo;
import org.apache.twill.kafka.client.BrokerService;
import org.apache.twill.kafka.client.Compression;
import org.apache.twill.kafka.client.KafkaClient;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.apache.twill.kafka.client.KafkaPublisher;
import org.apache.twill.zookeeper.ForwardingZKClientService;
import org.apache.twill.zookeeper.RetryStrategies;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKClientServices;
import org.apache.twill.zookeeper.ZKClients;

/* loaded from: input_file:co/cask/cdap/common/guice/KafkaClientModule.class */
public class KafkaClientModule extends PrivateModule {
    private static final String KAFKA_ZK = "kafkaZK";

    /* loaded from: input_file:co/cask/cdap/common/guice/KafkaClientModule$AbstractServiceWithZKClient.class */
    private static abstract class AbstractServiceWithZKClient<T extends Service> extends AbstractIdleService {
        private final ZKClientService zkClientService;
        private final T delegate;

        AbstractServiceWithZKClient(ZKClientService zKClientService, T t) {
            this.zkClientService = zKClientService;
            this.delegate = t;
        }

        protected final void startUp() throws Exception {
            this.zkClientService.startAndWait();
            try {
                this.delegate.startAndWait();
            } catch (Exception e) {
                try {
                    this.zkClientService.stopAndWait();
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                }
                throw e;
            }
        }

        protected final void shutDown() throws Exception {
            try {
                this.delegate.stopAndWait();
                this.zkClientService.stopAndWait();
            } catch (Exception e) {
                try {
                    this.zkClientService.stopAndWait();
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                }
                throw e;
            }
        }

        protected T getDelegate() {
            return this.delegate;
        }
    }

    /* loaded from: input_file:co/cask/cdap/common/guice/KafkaClientModule$DefaultBrokerService.class */
    private static final class DefaultBrokerService extends AbstractServiceWithZKClient<BrokerService> implements BrokerService {
        @Inject
        DefaultBrokerService(@Named("kafkaZK") ZKClientService zKClientService) {
            super(zKClientService, new ZKBrokerService(zKClientService));
        }

        public BrokerInfo getLeader(String str, int i) {
            return getDelegate().getLeader(str, i);
        }

        public Iterable<BrokerInfo> getBrokers() {
            return getDelegate().getBrokers();
        }

        public String getBrokerList() {
            return getDelegate().getBrokerList();
        }

        public Cancellable addChangeListener(BrokerService.BrokerChangeListener brokerChangeListener, Executor executor) {
            return getDelegate().addChangeListener(brokerChangeListener, executor);
        }
    }

    /* loaded from: input_file:co/cask/cdap/common/guice/KafkaClientModule$DefaultKafkaClientService.class */
    private static final class DefaultKafkaClientService extends AbstractServiceWithZKClient<KafkaClientService> implements KafkaClientService {
        @Inject
        DefaultKafkaClientService(@Named("kafkaZK") ZKClientService zKClientService) {
            super(zKClientService, new ZKKafkaClientService(zKClientService));
        }

        public KafkaPublisher getPublisher(KafkaPublisher.Ack ack, Compression compression) {
            return getDelegate().getPublisher(ack, compression);
        }

        public KafkaConsumer getConsumer() {
            return getDelegate().getConsumer();
        }
    }

    /* loaded from: input_file:co/cask/cdap/common/guice/KafkaClientModule$ZKClientServiceProvider.class */
    private static final class ZKClientServiceProvider implements Provider<ZKClientService> {
        private final CConfiguration cConf;
        private final Injector injector;

        @Inject
        ZKClientServiceProvider(CConfiguration cConfiguration, Injector injector) {
            this.cConf = cConfiguration;
            this.injector = injector;
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public ZKClientService m32get() {
            ZKClient delegate;
            String str = this.cConf.get(KafkaConstants.ConfigKeys.ZOOKEEPER_QUORUM);
            final AtomicInteger atomicInteger = new AtomicInteger();
            if (str == null) {
                delegate = (ZKClientService) this.injector.getInstance(ZKClientService.class);
                String str2 = this.cConf.get(KafkaConstants.ConfigKeys.ZOOKEEPER_NAMESPACE_CONFIG);
                if (str2 != null) {
                    if (!str2.startsWith("/")) {
                        str2 = "/" + str2;
                    }
                    delegate = ZKClientServices.delegate(ZKClients.namespace(delegate, str2));
                }
                atomicInteger.set(1);
            } else {
                delegate = ZKClientServices.delegate(ZKClients.reWatchOnExpire(ZKClients.retryOnFailure(ZKClientService.Builder.of(str).setSessionTimeout(this.cConf.getInt(Constants.Zookeeper.CFG_SESSION_TIMEOUT_MILLIS, Constants.Zookeeper.DEFAULT_SESSION_TIMEOUT_MILLIS)).build(), RetryStrategies.exponentialDelay(500L, 2000L, TimeUnit.MILLISECONDS))));
            }
            return new ForwardingZKClientService(delegate) { // from class: co.cask.cdap.common.guice.KafkaClientModule.ZKClientServiceProvider.1
                public ListenableFuture<Service.State> start() {
                    return atomicInteger.getAndIncrement() == 0 ? super.start() : Futures.immediateFuture(Service.State.RUNNING);
                }

                public ListenableFuture<Service.State> stop() {
                    return atomicInteger.decrementAndGet() == 0 ? super.stop() : Futures.immediateFuture(Service.State.TERMINATED);
                }
            };
        }
    }

    protected void configure() {
        bind(ZKClientService.class).annotatedWith(Names.named(KAFKA_ZK)).toProvider(ZKClientServiceProvider.class).in(Scopes.SINGLETON);
        bind(KafkaClientService.class).to(DefaultKafkaClientService.class).in(Scopes.SINGLETON);
        bind(BrokerService.class).to(DefaultBrokerService.class).in(Scopes.SINGLETON);
        bind(KafkaClient.class).to(KafkaClientService.class);
        expose(KafkaClient.class);
        expose(KafkaClientService.class);
        expose(BrokerService.class);
    }
}
