package org.apache.storm.messaging.netty;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.storm.Config;
import org.apache.storm.grouping.Load;
import org.apache.storm.messaging.ConnectionWithStatus;
import org.apache.storm.messaging.IConnectionCallback;
import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.policy.WaitStrategyProgressive;
import org.apache.storm.shade.com.google.common.base.Preconditions;
import org.apache.storm.shade.io.netty.bootstrap.Bootstrap;
import org.apache.storm.shade.io.netty.buffer.PooledByteBufAllocator;
import org.apache.storm.shade.io.netty.channel.Channel;
import org.apache.storm.shade.io.netty.channel.ChannelFuture;
import org.apache.storm.shade.io.netty.channel.ChannelFutureListener;
import org.apache.storm.shade.io.netty.channel.ChannelOption;
import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
import org.apache.storm.shade.io.netty.channel.WriteBufferWaterMark;
import org.apache.storm.shade.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.storm.shade.io.netty.util.HashedWheelTimer;
import org.apache.storm.shade.io.netty.util.Timeout;
import org.apache.storm.shade.io.netty.util.TimerTask;
import org.apache.storm.shade.io.netty.util.concurrent.Future;
import org.apache.storm.shade.io.netty.util.concurrent.GenericFutureListener;
import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.StormBoundedExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/messaging/netty/Client.class */
public class Client extends ConnectionWithStatus implements IStatefulObject, ISaslClient {
    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000;
    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000;
    private static final long CHANNEL_ALIVE_INTERVAL_MS = 30000;
    private static final String PREFIX = "Netty-Client-";
    private static final long NO_DELAY_MS = 0;
    protected final String dstAddressPrefixedName;
    private final Map<String, Object> topoConf;
    private final StormBoundedExponentialBackoffRetry retryPolicy;
    private final EventLoopGroup eventLoopGroup;
    private final Bootstrap bootstrap;
    private final InetSocketAddress dstAddress;
    private final HashedWheelTimer scheduler;
    private final MessageBuffer batcher;
    private final IWaitStrategy waitStrategy;
    private volatile boolean closing;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Client.class);
    private static final Timer TIMER = new Timer("Netty-ChannelAlive-Timer", true);
    private final AtomicReference<Channel> channelRef = new AtomicReference<>();
    private final AtomicInteger totalConnectionAttempts = new AtomicInteger(0);
    private final AtomicInteger connectionAttempts = new AtomicInteger(0);
    private final AtomicInteger messagesSent = new AtomicInteger(0);
    private final AtomicInteger messagesLost = new AtomicInteger(0);
    private final AtomicLong pendingMessages = new AtomicLong(0);
    private final AtomicBoolean saslChannelReady = new AtomicBoolean(false);
    private volatile Map<Integer, Double> serverLoad = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/messaging/netty/Client$Connect.class */
    public class Connect implements TimerTask {
        private final InetSocketAddress address;

        public Connect(InetSocketAddress inetSocketAddress) {
            this.address = inetSocketAddress;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void reschedule(Throwable th) {
            String format = String.format("connection attempt %s to %s failed", Client.this.connectionAttempts, Client.this.dstAddressPrefixedName);
            Client.LOG.error(th == null ? format : format + ": " + th.toString());
            Client.this.scheduleConnect(Client.this.retryPolicy.getSleepTimeMs(Client.this.connectionAttempts.get(), 0L));
        }

        @Override // org.apache.storm.shade.io.netty.util.TimerTask
        public void run(Timeout timeout) throws Exception {
            if (!Client.this.reconnectingAllowed()) {
                Client.this.close();
                throw new RuntimeException("Giving up to scheduleConnect to " + Client.this.dstAddressPrefixedName + " after " + Client.this.connectionAttempts + " failed attempts. " + Client.this.messagesLost.get() + " messages were lost");
            }
            final int andIncrement = Client.this.connectionAttempts.getAndIncrement();
            Client.this.totalConnectionAttempts.getAndIncrement();
            Client.LOG.debug("connecting to {} [attempt {}]", this.address.toString(), Integer.valueOf(andIncrement));
            Client.this.bootstrap.connect(this.address).addListener2((GenericFutureListener<? extends Future<? super Void>>) new ChannelFutureListener() { // from class: org.apache.storm.messaging.netty.Client.Connect.1
                @Override // org.apache.storm.shade.io.netty.util.concurrent.GenericFutureListener
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    Channel channel = channelFuture.channel();
                    if (!channelFuture.isSuccess() || !Client.this.connectionEstablished(channel)) {
                        Connect.this.reschedule(channelFuture.cause());
                        if (channel != null) {
                            channel.close();
                            return;
                        }
                        return;
                    }
                    Preconditions.checkState(Client.this.channelRef.compareAndSet(null, channel));
                    Client.LOG.debug("successfully connected to {}, {} [attempt {}]", Connect.this.address.toString(), channel.toString(), Integer.valueOf(andIncrement));
                    if (Client.this.messagesLost.get() > 0) {
                        Client.LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", Connect.this.address.toString(), Integer.valueOf(Client.this.messagesLost.get()));
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Client(Map<String, Object> map, AtomicBoolean[] atomicBooleanArr, EventLoopGroup eventLoopGroup, HashedWheelTimer hashedWheelTimer, String str, int i) {
        this.closing = false;
        this.topoConf = map;
        this.closing = false;
        this.scheduler = hashedWheelTimer;
        int intValue = ObjectReader.getInt(map.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)).intValue();
        int intValue2 = ObjectReader.getInt(map.get(Config.STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK)).intValue();
        int intValue3 = ObjectReader.getInt(map.get(Config.STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK)).intValue();
        this.saslChannelReady.set(!ObjectReader.getBoolean(map.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
        LOG.info("Creating Netty Client, connecting to {}:{}, bufferSize: {}, lowWatermark: {}, highWatermark: {}", str, Integer.valueOf(i), Integer.valueOf(intValue), Integer.valueOf(intValue2), Integer.valueOf(intValue3));
        this.retryPolicy = new StormBoundedExponentialBackoffRetry(ObjectReader.getInt(map.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS)).intValue(), ObjectReader.getInt(map.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)).intValue(), -1);
        this.eventLoopGroup = eventLoopGroup;
        this.bootstrap = new Bootstrap().group(this.eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_SNDBUF, Integer.valueOf(intValue)).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(intValue2, intValue3)).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).handler(new StormClientPipelineFactory(this, atomicBooleanArr, map));
        this.dstAddress = new InetSocketAddress(str, i);
        this.dstAddressPrefixedName = prefixedName(this.dstAddress);
        launchChannelAliveThread();
        scheduleConnect(0L);
        this.batcher = new MessageBuffer(ObjectReader.getInt(map.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144).intValue());
        String str2 = (String) map.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY);
        if (str2 == null) {
            this.waitStrategy = new WaitStrategyProgressive();
        } else {
            this.waitStrategy = (IWaitStrategy) ReflectionUtils.newInstance(str2);
        }
        this.waitStrategy.prepare(map, IWaitStrategy.WAIT_SITUATION.BACK_PRESSURE_WAIT);
    }

    private void launchChannelAliveThread() {
        TIMER.schedule(new java.util.TimerTask() { // from class: org.apache.storm.messaging.netty.Client.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    Client.LOG.debug("running timer task, address {}", Client.this.dstAddress);
                    if (Client.this.closing) {
                        cancel();
                    } else {
                        Client.this.getConnectedChannel();
                    }
                } catch (Exception e) {
                    Client.LOG.error("channel connection error {}", (Throwable) e);
                }
            }
        }, 0L, CHANNEL_ALIVE_INTERVAL_MS);
    }

    private String prefixedName(InetSocketAddress inetSocketAddress) {
        return null != inetSocketAddress ? PREFIX + inetSocketAddress.toString() : "";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleConnect(long j) {
        this.scheduler.newTimeout(new Connect(this.dstAddress), j, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean reconnectingAllowed() {
        return !this.closing;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean connectionEstablished(Channel channel) {
        return channel != null && channel.isActive();
    }

    @Override // org.apache.storm.messaging.ConnectionWithStatus
    public ConnectionWithStatus.Status status() {
        if (this.closing) {
            return ConnectionWithStatus.Status.Closed;
        }
        if (connectionEstablished(this.channelRef.get()) && this.saslChannelReady.get()) {
            return ConnectionWithStatus.Status.Ready;
        }
        return ConnectionWithStatus.Status.Connecting;
    }

    @Override // org.apache.storm.messaging.IConnection
    public void registerRecv(IConnectionCallback iConnectionCallback) {
        throw new UnsupportedOperationException("Client connection should not receive any messages");
    }

    @Override // org.apache.storm.messaging.IConnection
    public void registerNewConnectionResponse(Supplier<Object> supplier) {
        throw new UnsupportedOperationException("Client does not accept new connections");
    }

    @Override // org.apache.storm.messaging.IConnection
    public void sendLoadMetrics(Map<Integer, Double> map) {
        throw new RuntimeException("Client connection should not send load metrics");
    }

    @Override // org.apache.storm.messaging.IConnection
    public void sendBackPressureStatus(BackPressureStatus backPressureStatus) {
        throw new RuntimeException("Client connection should not send BackPressure status");
    }

    @Override // org.apache.storm.messaging.IConnection
    public void send(Iterator<TaskMessage> it) {
        if (this.closing) {
            LOG.error("Dropping {} messages because the Netty client to {} is being closed", Integer.valueOf(iteratorSize(it)), this.dstAddressPrefixedName);
            return;
        }
        if (hasMessages(it)) {
            Channel connectedChannel = getConnectedChannel();
            if (connectedChannel == null) {
                dropMessages(it);
                return;
            }
            while (it.hasNext()) {
                try {
                    MessageBatch add = this.batcher.add(it.next());
                    if (add != null) {
                        writeMessage(connectedChannel, add);
                    }
                } catch (IOException e) {
                    LOG.warn("Exception when sending message to remote worker.", (Throwable) e);
                    dropMessages(it);
                    return;
                }
            }
            MessageBatch drain = this.batcher.drain();
            if (drain != null) {
                writeMessage(connectedChannel, drain);
            }
        }
    }

    private void writeMessage(Channel channel, MessageBatch messageBatch) throws IOException {
        int i = 0;
        while (!channel.isWritable()) {
            try {
                if (i == 0) {
                    LOG.debug("Experiencing Back Pressure from Netty. Entering BackPressure Wait");
                }
                if (!channel.isActive()) {
                    throw new IOException("Connection disconnected");
                }
                i = this.waitStrategy.idle(i);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        flushMessages(channel, messageBatch);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Channel getConnectedChannel() {
        Channel channel = this.channelRef.get();
        if (connectionEstablished(channel)) {
            return channel;
        }
        if (!closeChannelAndReconnect(channel)) {
            return null;
        }
        LOG.error("connection to {} is unavailable", this.dstAddressPrefixedName);
        return null;
    }

    public InetSocketAddress getDstAddress() {
        return this.dstAddress;
    }

    private boolean hasMessages(Iterator<TaskMessage> it) {
        return it != null && it.hasNext();
    }

    private void dropMessages(Iterator<TaskMessage> it) {
        int iteratorSize = iteratorSize(it);
        this.messagesLost.getAndAdd(iteratorSize);
        LOG.info("Dropping {} messages", Integer.valueOf(iteratorSize));
    }

    private int iteratorSize(Iterator<TaskMessage> it) {
        int i = 0;
        if (it != null) {
            while (it.hasNext()) {
                i++;
                it.next();
            }
        }
        return i;
    }

    private void flushMessages(Channel channel, final MessageBatch messageBatch) {
        if (null == messageBatch || messageBatch.isEmpty()) {
            return;
        }
        final int size = messageBatch.size();
        LOG.debug("writing {} messages to channel {}", Integer.valueOf(messageBatch.size()), channel.toString());
        this.pendingMessages.addAndGet(size);
        channel.writeAndFlush(messageBatch).addListener2((GenericFutureListener<? extends Future<? super Void>>) new ChannelFutureListener() { // from class: org.apache.storm.messaging.netty.Client.2
            @Override // org.apache.storm.shade.io.netty.util.concurrent.GenericFutureListener
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                Client.this.pendingMessages.addAndGet(0 - size);
                if (channelFuture.isSuccess()) {
                    Client.LOG.debug("sent {} messages to {}", Integer.valueOf(size), Client.this.dstAddressPrefixedName);
                    Client.this.messagesSent.getAndAdd(messageBatch.size());
                } else {
                    Client.LOG.error("failed to send {} messages to {}: {}", Integer.valueOf(size), Client.this.dstAddressPrefixedName, channelFuture.cause());
                    Client.this.closeChannelAndReconnect(channelFuture.channel());
                    Client.this.messagesLost.getAndAdd(size);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean closeChannelAndReconnect(Channel channel) {
        if (channel == null) {
            return false;
        }
        channel.close();
        if (!this.channelRef.compareAndSet(channel, null)) {
            return false;
        }
        scheduleConnect(0L);
        return true;
    }

    @Override // org.apache.storm.messaging.IConnection
    public int getPort() {
        return this.dstAddress.getPort();
    }

    @Override // org.apache.storm.messaging.IConnection, java.lang.AutoCloseable
    public void close() {
        if (this.closing) {
            return;
        }
        LOG.info("closing Netty Client {}", this.dstAddressPrefixedName);
        this.closing = true;
        waitForPendingMessagesToBeSent();
        closeChannel();
    }

    private void waitForPendingMessagesToBeSent() {
        LOG.info("waiting up to {} ms to send {} pending messages to {}", Long.valueOf(PENDING_MESSAGES_FLUSH_TIMEOUT_MS), Long.valueOf(this.pendingMessages.get()), this.dstAddressPrefixedName);
        long j = this.pendingMessages.get();
        long currentTimeMillis = System.currentTimeMillis();
        while (this.pendingMessages.get() != 0) {
            try {
                if (System.currentTimeMillis() - currentTimeMillis > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
                    LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not sent", this.dstAddressPrefixedName, Long.valueOf(this.pendingMessages.get()), Long.valueOf(j));
                    return;
                }
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    private void closeChannel() {
        Channel channel = this.channelRef.get();
        if (channel != null) {
            channel.close();
            LOG.debug("channel to {} closed", this.dstAddressPrefixedName);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setLoadMetrics(Map<Integer, Double> map) {
        this.serverLoad = map;
    }

    @Override // org.apache.storm.messaging.IConnection
    public Map<Integer, Load> getLoad(Collection<Integer> collection) {
        Map<Integer, Double> map = this.serverLoad;
        HashMap hashMap = new HashMap();
        if (map != null) {
            double min = Math.min(this.pendingMessages.get(), FileUtils.ONE_KB) / 1024.0d;
            for (Integer num : collection) {
                Double d = map.get(num);
                if (d != null) {
                    hashMap.put(num, new Load(true, d.doubleValue(), min));
                }
            }
        }
        return hashMap;
    }

    @Override // org.apache.storm.metric.api.IStatefulObject
    public Object getState() {
        LOG.debug("Getting metrics for client connection to {}", this.dstAddressPrefixedName);
        HashMap hashMap = new HashMap();
        hashMap.put("reconnects", Integer.valueOf(this.totalConnectionAttempts.getAndSet(0)));
        hashMap.put("sent", Integer.valueOf(this.messagesSent.getAndSet(0)));
        hashMap.put("pending", Long.valueOf(this.pendingMessages.get()));
        hashMap.put("lostOnSend", Integer.valueOf(this.messagesLost.getAndSet(0)));
        hashMap.put("dest", this.dstAddress.toString());
        String srcAddressName = srcAddressName();
        if (srcAddressName != null) {
            hashMap.put("src", srcAddressName);
        }
        return hashMap;
    }

    public Map<String, Object> getConfig() {
        return this.topoConf;
    }

    @Override // org.apache.storm.messaging.netty.ISaslClient
    public void channelReady(Channel channel) {
        this.saslChannelReady.set(true);
    }

    @Override // org.apache.storm.messaging.netty.ISaslClient
    public String name() {
        return (String) this.topoConf.get(Config.TOPOLOGY_NAME);
    }

    @Override // org.apache.storm.messaging.netty.ISaslClient
    public String secretKey() {
        return SaslUtils.getSecretKey(this.topoConf);
    }

    private String srcAddressName() {
        SocketAddress localAddress;
        String str = null;
        Channel channel = this.channelRef.get();
        if (channel != null && (localAddress = channel.localAddress()) != null) {
            str = localAddress.toString();
        }
        return str;
    }

    public String toString() {
        return String.format("Netty client for connecting to %s", this.dstAddressPrefixedName);
    }
}
