package io.simplesource.kafka.internal.cluster;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/kafka/internal/cluster/Server.class */
public final class Server {
    private static final Logger logger = LoggerFactory.getLogger(Server.class);
    private final ClusterConfig config;
    private final EventLoopGroup bossGroup;
    private final EventLoopGroup workerGroup;
    private final Class channelClass;
    private final PipelineInitializer pipelineInitializer;
    private ServerBootstrap bootstrap;
    private boolean started = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Server(ClusterConfig clusterConfig, EventLoopGroup eventLoopGroup, EventLoopGroup eventLoopGroup2, Class cls, PipelineInitializer pipelineInitializer) {
        this.config = clusterConfig;
        this.bossGroup = eventLoopGroup;
        this.workerGroup = eventLoopGroup2;
        this.channelClass = cls;
        this.pipelineInitializer = pipelineInitializer;
    }

    public synchronized ChannelFuture start() {
        if (this.started) {
            throw new IllegalStateException("Attempting to start Simple Sourcing RPC server that has already started");
        }
        this.started = true;
        this.bootstrap = new ServerBootstrap();
        this.bootstrap.group(this.bossGroup, this.workerGroup).channel(this.channelClass).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).option(ChannelOption.SO_REUSEADDR, Boolean.valueOf(this.config.reuseAddress())).option(ChannelOption.SO_BACKLOG, Integer.valueOf(this.config.acceptBackLog())).childOption(ChannelOption.TCP_NODELAY, Boolean.valueOf(this.config.tcpNoDelay())).childOption(ChannelOption.SO_SNDBUF, Integer.valueOf(this.config.tcpSendBufferSize())).childOption(ChannelOption.SO_RCVBUF, Integer.valueOf(this.config.tcpReceiveBufferSize())).childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(this.config.tcpReceiveBufferSize())).childOption(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(this.config.tcpKeepAlive())).childOption(ChannelOption.SO_LINGER, Integer.valueOf(this.config.soLinger())).localAddress(this.config.iface(), this.config.port()).childHandler(new ChannelInitializer<Channel>() { // from class: io.simplesource.kafka.internal.cluster.Server.1
            protected void initChannel(Channel channel) {
                Server.this.pipelineInitializer.init(channel.pipeline());
            }
        });
        return this.bootstrap.bind().addListener(future -> {
            if (future.isSuccess()) {
                logger.info("Simple Sourcing RPC server started and bound to interface: {} on port: {}", this.config.iface(), Integer.valueOf(this.config.port()));
            } else {
                logger.error("Simple Sourcing RPC server failed to bind to interface: {} on port: {}", this.config.iface(), Integer.valueOf(this.config.port()));
            }
        });
    }

    public synchronized void stop() {
        if (!this.started) {
            throw new IllegalStateException("Attempting to stop Simple Sourcing RPC server that has not been started");
        }
        this.bossGroup.shutdownGracefully().syncUninterruptibly();
        this.workerGroup.shutdownGracefully().syncUninterruptibly();
        logger.info("Simple Sourcing RPC server stopped");
    }
}
