package org.apache.storm.messaging.netty;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.storm.Config;
import org.apache.storm.grouping.Load;
import org.apache.storm.messaging.ConnectionWithStatus;
import org.apache.storm.messaging.IConnectionCallback;
import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.serialization.KryoValuesSerializer;
import org.apache.storm.shade.io.netty.bootstrap.ServerBootstrap;
import org.apache.storm.shade.io.netty.buffer.PooledByteBufAllocator;
import org.apache.storm.shade.io.netty.channel.Channel;
import org.apache.storm.shade.io.netty.channel.ChannelOption;
import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
import org.apache.storm.shade.io.netty.channel.group.ChannelGroup;
import org.apache.storm.shade.io.netty.channel.group.DefaultChannelGroup;
import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.storm.shade.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.storm.shade.io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/messaging/netty/Server.class */
class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServer {
    public static final int LOAD_METRICS_TASK_ID = -1;
    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
    private final EventLoopGroup bossEventLoopGroup;
    private final EventLoopGroup workerEventLoopGroup;
    private final ServerBootstrap bootstrap;
    private final int boundPort;
    private final Map<String, Object> topoConf;
    private final int port;
    private final KryoValuesSerializer ser;
    private final IConnectionCallback cb;
    private final Supplier<Object> newConnectionResponse;
    private final boolean isNettyAuthRequired;
    private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<>();
    private final AtomicInteger messagesDequeued = new AtomicInteger(0);
    private final ChannelGroup allChannels = new DefaultChannelGroup("storm-server", GlobalEventExecutor.INSTANCE);
    private volatile boolean closing = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Server(Map<String, Object> map, int i, IConnectionCallback iConnectionCallback, Supplier<Object> supplier) {
        this.topoConf = map;
        this.isNettyAuthRequired = ((Boolean) map.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION)).booleanValue();
        this.port = i;
        this.ser = new KryoValuesSerializer(map);
        this.cb = iConnectionCallback;
        this.newConnectionResponse = supplier;
        int intValue = ObjectReader.getInt(map.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)).intValue();
        int intValue2 = ObjectReader.getInt(map.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS)).intValue();
        NettyRenameThreadFactory nettyRenameThreadFactory = new NettyRenameThreadFactory(netty_name() + "-boss");
        NettyRenameThreadFactory nettyRenameThreadFactory2 = new NettyRenameThreadFactory(netty_name() + "-worker");
        this.bossEventLoopGroup = new NioEventLoopGroup(1, nettyRenameThreadFactory);
        this.workerEventLoopGroup = new NioEventLoopGroup(intValue2 > 0 ? intValue2 : 0, nettyRenameThreadFactory2);
        LOG.info("Create Netty Server " + netty_name() + ", buffer_size: " + intValue + ", maxWorkers: " + intValue2);
        this.bootstrap = new ServerBootstrap().group(this.bossEventLoopGroup, this.workerEventLoopGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_BACKLOG, Integer.valueOf(ObjectReader.getInt(map.get(Config.STORM_MESSAGING_NETTY_SOCKET_BACKLOG), 500).intValue())).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_RCVBUF, Integer.valueOf(intValue)).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new StormServerPipelineFactory(map, this));
        try {
            Channel channel = this.bootstrap.bind(new InetSocketAddress(i)).sync().channel();
            this.boundPort = ((InetSocketAddress) channel.localAddress()).getPort();
            this.allChannels.add(channel);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void addReceiveCount(String str, int i) {
        AtomicInteger atomicInteger = this.messagesEnqueued.get(str);
        if (atomicInteger != null) {
            atomicInteger.addAndGet(i);
            return;
        }
        AtomicInteger putIfAbsent = this.messagesEnqueued.putIfAbsent(str, new AtomicInteger(i));
        if (putIfAbsent != null) {
            putIfAbsent.addAndGet(i);
        }
    }

    protected void enqueue(List<TaskMessage> list, String str) throws InterruptedException {
        if (null == list || list.isEmpty() || this.closing) {
            return;
        }
        addReceiveCount(str, list.size());
        this.cb.recv(list);
    }

    @Override // org.apache.storm.messaging.IConnection
    public int getPort() {
        return this.boundPort;
    }

    @Override // org.apache.storm.messaging.IConnection, java.lang.AutoCloseable
    public void close() {
        this.allChannels.close().awaitUninterruptibly();
        this.workerEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
        this.bossEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
    }

    @Override // org.apache.storm.messaging.IConnection
    public void sendLoadMetrics(Map<Integer, Double> map) {
        MessageBatch messageBatch = new MessageBatch(1);
        synchronized (this.ser) {
            messageBatch.add(new TaskMessage(-1, this.ser.serialize(Collections.singletonList(map))));
        }
        this.allChannels.writeAndFlush(messageBatch);
    }

    @Override // org.apache.storm.messaging.IConnection
    public void sendBackPressureStatus(BackPressureStatus backPressureStatus) {
        LOG.info("Sending BackPressure status update to connected workers. BPStatus = {}", backPressureStatus);
        this.allChannels.writeAndFlush(backPressureStatus);
    }

    @Override // org.apache.storm.messaging.IConnection
    public Map<Integer, Load> getLoad(Collection<Integer> collection) {
        throw new RuntimeException("Server connection cannot get load");
    }

    @Override // org.apache.storm.messaging.IConnection
    public void send(Iterator<TaskMessage> it) {
        throw new UnsupportedOperationException("Server connection should not send any messages");
    }

    public final String netty_name() {
        return "Netty-server-localhost-" + this.port;
    }

    @Override // org.apache.storm.messaging.ConnectionWithStatus
    public ConnectionWithStatus.Status status() {
        return this.closing ? ConnectionWithStatus.Status.Closed : !connectionEstablished(this.allChannels) ? ConnectionWithStatus.Status.Connecting : ConnectionWithStatus.Status.Ready;
    }

    private boolean connectionEstablished(Channel channel) {
        return channel != null && channel.isActive();
    }

    private boolean connectionEstablished(ChannelGroup channelGroup) {
        boolean z = true;
        Iterator it = channelGroup.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (!connectionEstablished((Channel) it.next())) {
                z = false;
                break;
            }
        }
        return z;
    }

    @Override // org.apache.storm.metric.api.IStatefulObject
    public Object getState() {
        LOG.debug("Getting metrics for server on port {}", Integer.valueOf(this.port));
        HashMap hashMap = new HashMap();
        hashMap.put("dequeuedMessages", Integer.valueOf(this.messagesDequeued.getAndSet(0)));
        HashMap hashMap2 = new HashMap();
        Iterator<Map.Entry<String, AtomicInteger>> it = this.messagesEnqueued.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, AtomicInteger> next = it.next();
            AtomicInteger value = next.getValue();
            if (value.get() == 0) {
                it.remove();
            } else {
                hashMap2.put(next.getKey(), Integer.valueOf(value.getAndSet(0)));
            }
        }
        hashMap.put("enqueued", hashMap2);
        if (this.cb instanceof IMetric) {
            Object valueAndReset = ((IMetric) this.cb).getValueAndReset();
            if (valueAndReset instanceof Map) {
                hashMap.put("messageBytes", valueAndReset);
            }
        }
        return hashMap;
    }

    @Override // org.apache.storm.messaging.netty.IServer
    public void channelActive(Channel channel) {
        if (!this.isNettyAuthRequired) {
            authenticated(channel);
        }
        this.allChannels.add(channel);
    }

    @Override // org.apache.storm.messaging.netty.IServer
    public void received(Object obj, String str, Channel channel) throws InterruptedException {
        try {
            enqueue((List) obj, str);
        } catch (ClassCastException e) {
            LOG.error("Worker netty server received message other than the expected class List<TaskMessage> from remote: {}. Ignored.", str, e);
        }
    }

    @Override // org.apache.storm.messaging.netty.ISaslServer
    public String name() {
        return (String) this.topoConf.get(Config.TOPOLOGY_NAME);
    }

    @Override // org.apache.storm.messaging.netty.ISaslServer
    public String secretKey() {
        return SaslUtils.getSecretKey(this.topoConf);
    }

    @Override // org.apache.storm.messaging.netty.ISaslServer
    public void authenticated(Channel channel) {
        if (this.isNettyAuthRequired) {
            LOG.debug("The channel {} is active and authenticated", channel);
        } else {
            LOG.debug("The channel {} is active", channel);
        }
        if (this.newConnectionResponse != null) {
            channel.writeAndFlush(this.newConnectionResponse.get(), channel.voidPromise());
        }
    }

    public String toString() {
        return String.format("Netty server listening on port %s", Integer.valueOf(this.port));
    }
}
