package com.linkedin.r2.transport.http.client;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.util.None;
import com.linkedin.r2.filter.R2Constants;
import com.linkedin.r2.message.Messages;
import com.linkedin.r2.message.Request;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamRequestBuilder;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.transport.common.MessageType;
import com.linkedin.r2.transport.common.WireAttributeHelper;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import com.linkedin.r2.transport.common.bridge.common.TransportCallback;
import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl;
import com.linkedin.r2.transport.http.common.HttpBridge;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/r2-netty-11.0.0.jar:com/linkedin/r2/transport/http/client/AbstractNettyStreamClient.class */
public abstract class AbstractNettyStreamClient implements TransportClient {
    static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractNettyStreamClient.class);
    private static final int HTTP_DEFAULT_PORT = 80;
    private static final int HTTPS_DEFAULT_PORT = 443;
    protected final ChannelGroup _allChannels;
    protected final AtomicReference<State> _state;
    protected final ScheduledExecutorService _scheduler;
    protected final ExecutorService _callbackExecutors;
    protected final long _requestTimeout;
    protected final long _shutdownTimeout;
    protected final long _maxResponseSize;
    protected final int _maxConcurrentConnections;
    protected final String _requestTimeoutMessage;
    protected final AbstractJmxManager _jmxManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/r2-netty-11.0.0.jar:com/linkedin/r2/transport/http/client/AbstractNettyStreamClient$State.class */
    public enum State {
        RUNNING,
        SHUTTING_DOWN,
        REQUESTS_STOPPING,
        SHUTDOWN
    }

    public AbstractNettyStreamClient(NioEventLoopGroup nioEventLoopGroup, ScheduledExecutorService scheduledExecutorService, long j, long j2, long j3, ExecutorService executorService, AbstractJmxManager abstractJmxManager, int i) {
        this._state = new AtomicReference<>(State.RUNNING);
        this._maxResponseSize = j3;
        this._maxConcurrentConnections = i;
        this._scheduler = scheduledExecutorService;
        this._callbackExecutors = executorService == null ? nioEventLoopGroup : executorService;
        this._requestTimeout = j;
        this._shutdownTimeout = j2;
        this._requestTimeoutMessage = "Exceeded request timeout of " + this._requestTimeout + "ms";
        this._jmxManager = abstractJmxManager;
        this._allChannels = new DefaultChannelGroup("R2 client channels", nioEventLoopGroup.next());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractNettyStreamClient(ChannelPoolFactory channelPoolFactory, ScheduledExecutorService scheduledExecutorService, int i, int i2, long j) {
        this._state = new AtomicReference<>(State.RUNNING);
        this._maxResponseSize = j;
        this._scheduler = scheduledExecutorService;
        this._callbackExecutors = new DefaultEventExecutorGroup(1);
        this._requestTimeout = i;
        this._shutdownTimeout = i2;
        this._requestTimeoutMessage = "Exceeded request timeout of " + this._requestTimeout + "ms";
        this._jmxManager = AbstractJmxManager.NULL_JMX_MANAGER;
        this._maxConcurrentConnections = Integer.MAX_VALUE;
        this._allChannels = new DefaultChannelGroup("R2 client channels", GlobalEventExecutor.INSTANCE);
    }

    public abstract Map<String, PoolStats> getPoolStats();

    protected abstract void doShutdown(Callback<None> callback);

    protected abstract void doWriteRequest(Request request, RequestContext requestContext, SocketAddress socketAddress, TimeoutTransportCallback<StreamResponse> timeoutTransportCallback);

    @Override // com.linkedin.r2.transport.common.bridge.client.TransportClient
    public void restRequest(RestRequest restRequest, RequestContext requestContext, Map<String, String> map, TransportCallback<RestResponse> transportCallback) {
        throw new UnsupportedOperationException("This client only handles streaming.");
    }

    @Override // com.linkedin.r2.transport.common.bridge.client.TransportClient
    public void streamRequest(StreamRequest streamRequest, RequestContext requestContext, Map<String, String> map, TransportCallback<StreamResponse> transportCallback) {
        MessageType.setMessageType(MessageType.Type.REST, map);
        writeRequestWithTimeout(streamRequest, requestContext, map, HttpBridge.streamToHttpCallback(transportCallback, streamRequest));
    }

    @Override // com.linkedin.r2.transport.common.bridge.client.TransportClient
    public void shutdown(Callback<None> callback) {
        LOG.info("Shutdown requested");
        if (!this._state.compareAndSet(State.RUNNING, State.SHUTTING_DOWN)) {
            callback.onError(new IllegalStateException("Shutdown has already been requested."));
        } else {
            LOG.info("Shutting down");
            doShutdown(callback);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void writeRequestWithTimeout(StreamRequest streamRequest, final RequestContext requestContext, Map<String, String> map, TransportCallback<StreamResponse> transportCallback) {
        final TimeoutTransportCallback<StreamResponse> timeoutTransportCallback = new TimeoutTransportCallback<>(this._scheduler, this._requestTimeout, TimeUnit.MILLISECONDS, new StreamExecutionCallback(this._callbackExecutors, transportCallback), this._requestTimeoutMessage);
        StreamRequest build = ((StreamRequestBuilder) streamRequest.builder().overwriteHeaders(WireAttributeHelper.toWireAttributes(map))).build(streamRequest.getEntityStream());
        if (isFullRequest(requestContext)) {
            Messages.toRestRequest(build, new Callback<RestRequest>() { // from class: com.linkedin.r2.transport.http.client.AbstractNettyStreamClient.1
                @Override // com.linkedin.common.callback.Callback
                public void onError(Throwable th) {
                    AbstractNettyStreamClient.errorResponse(timeoutTransportCallback, th);
                }

                @Override // com.linkedin.common.callback.SuccessCallback
                public void onSuccess(RestRequest restRequest) {
                    AbstractNettyStreamClient.this.writeRequest(restRequest, requestContext, timeoutTransportCallback);
                }
            });
        } else {
            writeRequest(build, requestContext, timeoutTransportCallback);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeRequest(Request request, RequestContext requestContext, TimeoutTransportCallback<StreamResponse> timeoutTransportCallback) {
        State state = this._state.get();
        if (state != State.RUNNING) {
            errorResponse(timeoutTransportCallback, new IllegalStateException("Client is " + state));
            return;
        }
        URI uri = request.getURI();
        String scheme = uri.getScheme();
        if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme)) {
            errorResponse(timeoutTransportCallback, new IllegalArgumentException("Unknown scheme: " + scheme + " (only http/https is supported)"));
            return;
        }
        String host = uri.getHost();
        int port = uri.getPort();
        if (port == -1) {
            port = "http".equalsIgnoreCase(scheme) ? 80 : 443;
        }
        try {
            InetAddress byName = InetAddress.getByName(host);
            InetSocketAddress inetSocketAddress = new InetSocketAddress(byName, port);
            requestContext.putLocalAttr(R2Constants.REMOTE_SERVER_ADDR, byName.getHostAddress());
            doWriteRequest(request, requestContext, inetSocketAddress, timeoutTransportCallback);
        } catch (UnknownHostException e) {
            errorResponse(timeoutTransportCallback, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> void errorResponse(TransportCallback<T> transportCallback, Throwable th) {
        transportCallback.onResponse(TransportResponseImpl.error(th));
    }

    static boolean isFullRequest(RequestContext requestContext) {
        Object localAttr = requestContext.getLocalAttr(R2Constants.IS_FULL_REQUEST);
        return localAttr != null && ((Boolean) localAttr).booleanValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Exception toException(Throwable th) {
        return th instanceof Exception ? (Exception) th : new Exception("Wrapped Throwable", th);
    }

    public long getRequestTimeout() {
        return this._requestTimeout;
    }

    public long getShutdownTimeout() {
        return this._shutdownTimeout;
    }

    public long getMaxResponseSize() {
        return this._maxResponseSize;
    }
}
