package io.mantisrx.publish.netty.pipeline;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.impl.AtomicDouble;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.shaded.io.netty.bootstrap.Bootstrap;
import io.mantisrx.shaded.io.netty.channel.Channel;
import io.mantisrx.shaded.io.netty.channel.ChannelFuture;
import io.mantisrx.shaded.io.netty.channel.ChannelOption;
import io.mantisrx.shaded.io.netty.channel.DefaultEventLoopGroup;
import io.mantisrx.shaded.io.netty.channel.EventLoopGroup;
import io.mantisrx.shaded.io.netty.channel.WriteBufferWaterMark;
import io.mantisrx.shaded.io.netty.channel.nio.NioEventLoopGroup;
import io.mantisrx.shaded.io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/mantis-publish-netty-1.3.51.jar:io/mantisrx/publish/netty/pipeline/HttpEventChannelManager.class */
public class HttpEventChannelManager {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HttpEventChannel.class);
    private final Counter connectionSuccess;
    private final Counter connectionFailure;
    private final AtomicDouble liveConnections;
    private final AtomicDouble nettyChannelBufferSize;
    private final int lowWriteBufferWatermark;
    private final int highWriteBufferWatermark;
    private final EventLoopGroup eventLoopGroup;
    private final EventLoopGroup encoderEventLoopGroup;
    private final Bootstrap bootstrap;
    private final ConcurrentMap<String, Channel> channels;

    public HttpEventChannelManager(Registry registry, MrePublishConfiguration mrePublishConfiguration) {
        this.connectionSuccess = SpectatorUtils.buildAndRegisterCounter(registry, "connectionSuccess", "channel", HttpEventChannel.CHANNEL_TYPE);
        this.connectionFailure = SpectatorUtils.buildAndRegisterCounter(registry, "connectionFailure", "channel", HttpEventChannel.CHANNEL_TYPE);
        this.liveConnections = SpectatorUtils.buildAndRegisterGauge(registry, "liveConnections", "channel", HttpEventChannel.CHANNEL_TYPE);
        this.nettyChannelBufferSize = SpectatorUtils.buildAndRegisterGauge(registry, "bufferSize", "channel", HttpEventChannel.CHANNEL_TYPE);
        this.lowWriteBufferWatermark = mrePublishConfiguration.getLowWriteBufferWatermark();
        this.highWriteBufferWatermark = mrePublishConfiguration.getHighWriteBufferWatermark();
        this.eventLoopGroup = new NioEventLoopGroup(mrePublishConfiguration.getIoThreads());
        if (mrePublishConfiguration.getGzipEnabled()) {
            this.encoderEventLoopGroup = new DefaultEventLoopGroup(mrePublishConfiguration.getCompressionThreads());
        } else {
            this.encoderEventLoopGroup = null;
        }
        this.bootstrap = new Bootstrap().group(this.eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, MantisMessageSizeEstimator.DEFAULT).option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(this.lowWriteBufferWatermark, this.highWriteBufferWatermark)).handler(new HttpEventChannelInitializer(registry, mrePublishConfiguration, this.encoderEventLoopGroup));
        this.channels = new ConcurrentHashMap();
        Runtime runtime = Runtime.getRuntime();
        EventLoopGroup eventLoopGroup = this.eventLoopGroup;
        eventLoopGroup.getClass();
        runtime.addShutdownHook(new Thread(eventLoopGroup::shutdownGracefully));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Channel findOrCreate(InetSocketAddress inetSocketAddress) {
        Channel find = find(inetSocketAddress);
        if (find == null) {
            LOG.debug("creating new channel for {}", inetSocketAddress);
            ChannelFuture connect = this.bootstrap.connect(inetSocketAddress);
            find = connect.channel();
            this.channels.put(getHostPortString(inetSocketAddress), find);
            find.closeFuture().addListener2(future -> {
                LOG.debug("closing channel for {}", inetSocketAddress);
                this.channels.remove(getHostPortString(inetSocketAddress));
                this.liveConnections.set(this.channels.size());
            });
            connect.addListener2(future2 -> {
                if (!future2.isSuccess()) {
                    LOG.debug("failed to connect to {}", inetSocketAddress);
                    this.connectionFailure.increment();
                } else {
                    LOG.debug("connection success for {}", inetSocketAddress);
                    this.connectionSuccess.increment();
                    this.liveConnections.set(this.channels.size());
                }
            });
        }
        this.nettyChannelBufferSize.set(this.highWriteBufferWatermark - find.bytesBeforeUnwritable());
        return find;
    }

    private Channel find(InetSocketAddress inetSocketAddress) {
        return this.channels.get(getHostPortString(inetSocketAddress));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close(InetSocketAddress inetSocketAddress) {
        Channel find = find(inetSocketAddress);
        if (find != null) {
            find.close();
        }
    }

    private String getHostPortString(InetSocketAddress inetSocketAddress) {
        return inetSocketAddress.getHostString() + ':' + inetSocketAddress.getPort();
    }
}
