package co.cask.cdap.gateway.collector;

import co.cask.cdap.common.conf.CConfiguration;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.twill.common.Threads;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/gateway/collector/NettyFlumeCollector.class */
public class NettyFlumeCollector extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(NettyFlumeCollector.class);
    private final int threads;
    private final int port;
    private final FlumeAdapter flumeAdapter;
    private Server server;

    @Inject
    public NettyFlumeCollector(CConfiguration cConfiguration, FlumeAdapter flumeAdapter) {
        this.threads = cConfiguration.getInt("stream.flume.threads", 10);
        this.port = cConfiguration.getInt("stream.flume.port", 10004);
        this.flumeAdapter = flumeAdapter;
    }

    public int getPort() {
        return this.server.getPort();
    }

    protected void startUp() throws Exception {
        LOG.info("Starting NettyFlumeCollector...");
        this.flumeAdapter.startAndWait();
        this.server = new NettyServer(new SpecificResponder(AvroSourceProtocol.Callback.class, this.flumeAdapter), new InetSocketAddress(this.port), new NioServerSocketChannelFactory(Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("flume-stream-boss")), Executors.newFixedThreadPool(this.threads, Threads.createDaemonThreadFactory("flume-stream-worker"))));
        this.server.start();
        LOG.info("NettyFlumeCollector started on port {} with {} threads", Integer.valueOf(this.server.getPort()), Integer.valueOf(this.threads));
    }

    protected void shutDown() throws Exception {
        LOG.info("Stopping NettyFlumeCollector...");
        try {
            this.server.close();
            this.server.join();
        } catch (InterruptedException e) {
            LOG.info("Received interrupt during join.");
        }
        this.flumeAdapter.stopAndWait();
        LOG.info("Stopped NettyFlumeCollector");
    }
}
