package co.cask.cdap.data.stream.service;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.hooks.MetricsReporterHook;
import co.cask.cdap.common.logging.LoggingContextAccessor;
import co.cask.cdap.common.logging.ServiceLoggingContext;
import co.cask.cdap.common.metrics.MetricsCollectionService;
import co.cask.cdap.data.stream.StreamCoordinator;
import co.cask.http.HttpHandler;
import co.cask.http.NettyHttpService;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import java.net.InetSocketAddress;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryService;
import org.jboss.netty.buffer.HeapChannelBufferFactory;

/* loaded from: input_file:co/cask/cdap/data/stream/service/StreamHttpService.class */
public final class StreamHttpService extends AbstractIdleService {
    private final DiscoveryService discoveryService;
    private final NettyHttpService httpService;
    private final StreamCoordinator streamCoordinator;
    private final StreamFileJanitorService janitorService;
    private Cancellable cancellable;

    @Inject
    public StreamHttpService(CConfiguration cConfiguration, DiscoveryService discoveryService, StreamCoordinator streamCoordinator, StreamFileJanitorService streamFileJanitorService, @Named("stream.handler") Set<HttpHandler> set, @Nullable MetricsCollectionService metricsCollectionService) {
        this.discoveryService = discoveryService;
        this.streamCoordinator = streamCoordinator;
        this.janitorService = streamFileJanitorService;
        this.httpService = NettyHttpService.builder().addHttpHandlers(set).setHandlerHooks(ImmutableList.of(new MetricsReporterHook(metricsCollectionService, "stream.handler"))).setHost(cConfiguration.get("stream.bind.address")).setWorkerThreadPoolSize(cConfiguration.getInt("stream.worker.threads", 10)).setExecThreadPoolSize(0).setConnectionBacklog(20000).setChannelConfig("child.bufferFactory", HeapChannelBufferFactory.getInstance()).build();
    }

    protected void startUp() throws Exception {
        LoggingContextAccessor.setLoggingContext(new ServiceLoggingContext("cdap", "services", "streams"));
        this.httpService.startAndWait();
        this.cancellable = this.discoveryService.register(new Discoverable() { // from class: co.cask.cdap.data.stream.service.StreamHttpService.1
            public String getName() {
                return "streams";
            }

            public InetSocketAddress getSocketAddress() {
                return StreamHttpService.this.httpService.getBindAddress();
            }
        });
        this.janitorService.startAndWait();
    }

    protected void shutDown() throws Exception {
        this.janitorService.stopAndWait();
        try {
            if (this.cancellable != null) {
                this.cancellable.cancel();
            }
        } finally {
            this.httpService.stopAndWait();
            this.streamCoordinator.close();
        }
    }

    public String toString() {
        return Objects.toStringHelper(this).add("bindAddress", this.httpService.getBindAddress()).toString();
    }
}
