package kusto_connector_shaded.com.azure.core.http.netty;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.Iterator;
import java.util.Objects;
import java.util.function.BiFunction;
import javax.net.ssl.SSLException;
import kusto_connector_shaded.com.azure.core.http.HttpClient;
import kusto_connector_shaded.com.azure.core.http.HttpHeader;
import kusto_connector_shaded.com.azure.core.http.HttpRequest;
import kusto_connector_shaded.com.azure.core.http.HttpResponse;
import kusto_connector_shaded.com.azure.core.http.netty.implementation.AzureNettyHttpClientContext;
import kusto_connector_shaded.com.azure.core.http.netty.implementation.NettyAsyncHttpBufferedResponse;
import kusto_connector_shaded.com.azure.core.http.netty.implementation.NettyAsyncHttpResponse;
import kusto_connector_shaded.com.azure.core.http.netty.implementation.Utility;
import kusto_connector_shaded.com.azure.core.implementation.util.BinaryDataContent;
import kusto_connector_shaded.com.azure.core.implementation.util.BinaryDataHelper;
import kusto_connector_shaded.com.azure.core.implementation.util.ByteArrayContent;
import kusto_connector_shaded.com.azure.core.implementation.util.FileContent;
import kusto_connector_shaded.com.azure.core.implementation.util.InputStreamContent;
import kusto_connector_shaded.com.azure.core.implementation.util.SerializableContent;
import kusto_connector_shaded.com.azure.core.implementation.util.StringContent;
import kusto_connector_shaded.com.azure.core.util.BinaryData;
import kusto_connector_shaded.com.azure.core.util.Context;
import kusto_connector_shaded.com.azure.core.util.Contexts;
import kusto_connector_shaded.com.azure.core.util.ProgressReporter;
import kusto_connector_shaded.com.azure.core.util.logging.ClientLogger;
import kusto_connector_shaded.org.reactivestreams.Publisher;
import kusto_connector_shaded.reactor.core.Exceptions;
import kusto_connector_shaded.reactor.core.publisher.Flux;
import kusto_connector_shaded.reactor.core.publisher.Mono;
import kusto_connector_shaded.reactor.netty.Connection;
import kusto_connector_shaded.reactor.netty.NettyOutbound;
import kusto_connector_shaded.reactor.netty.NettyPipeline;
import kusto_connector_shaded.reactor.netty.http.client.HttpClient;
import kusto_connector_shaded.reactor.netty.http.client.HttpClientRequest;
import kusto_connector_shaded.reactor.netty.http.client.HttpClientResponse;
import kusto_connector_shaded.reactor.util.function.Tuple2;
import kusto_connector_shaded.reactor.util.function.Tuples;
import kusto_connector_shaded_netty.io.netty.buffer.Unpooled;
import kusto_connector_shaded_netty.io.netty.handler.codec.http.HttpHeaders;
import kusto_connector_shaded_netty.io.netty.handler.codec.http.HttpMethod;
import kusto_connector_shaded_netty.io.netty.handler.proxy.HttpProxyHandler;
import kusto_connector_shaded_netty.io.netty.handler.proxy.ProxyConnectException;
import kusto_connector_shaded_netty.io.netty.handler.stream.ChunkedNioFile;
import kusto_connector_shaded_netty.io.netty.handler.stream.ChunkedStream;
import kusto_connector_shaded_netty.io.netty.handler.stream.ChunkedWriteHandler;

/* loaded from: input_file:kusto_connector_shaded/com/azure/core/http/netty/NettyAsyncHttpClient.class */
class NettyAsyncHttpClient implements HttpClient {
    private static final ClientLogger LOGGER = new ClientLogger((Class<?>) NettyAsyncHttpClient.class);
    private static final byte[] EMPTY_BYTES = new byte[0];
    private static final String AZURE_EAGERLY_READ_RESPONSE = "azure-eagerly-read-response";
    private static final String AZURE_IGNORE_RESPONSE_BODY = "azure-ignore-response-body";
    private static final String AZURE_RESPONSE_TIMEOUT = "azure-response-timeout";
    private static final String AZURE_EAGERLY_CONVERT_HEADERS = "azure-eagerly-convert-headers";
    final boolean disableBufferCopy;
    final boolean addProxyHandler;
    final kusto_connector_shaded.reactor.netty.http.client.HttpClient nettyClient;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyAsyncHttpClient(kusto_connector_shaded.reactor.netty.http.client.HttpClient httpClient, boolean z, boolean z2) {
        this.nettyClient = httpClient;
        this.disableBufferCopy = z;
        this.addProxyHandler = z2;
    }

    @Override // kusto_connector_shaded.com.azure.core.http.HttpClient
    public Mono<HttpResponse> send(HttpRequest httpRequest) {
        return send(httpRequest, Context.NONE);
    }

    @Override // kusto_connector_shaded.com.azure.core.http.HttpClient
    public Mono<HttpResponse> send(HttpRequest httpRequest, Context context) {
        Objects.requireNonNull(httpRequest.getHttpMethod(), "'request.getHttpMethod()' cannot be null.");
        Objects.requireNonNull(httpRequest.getUrl(), "'request.getUrl()' cannot be null.");
        Objects.requireNonNull(httpRequest.getUrl().getProtocol(), "'request.getUrl().getProtocol()' cannot be null.");
        return attemptAsync(httpRequest, ((Boolean) context.getData(AZURE_EAGERLY_READ_RESPONSE).orElse(false)).booleanValue(), ((Boolean) context.getData(AZURE_IGNORE_RESPONSE_BODY).orElse(false)).booleanValue(), ((Boolean) context.getData(AZURE_EAGERLY_CONVERT_HEADERS).orElse(false)).booleanValue(), (Long) context.getData(AZURE_RESPONSE_TIMEOUT).filter(obj -> {
            return obj instanceof Duration;
        }).map(obj2 -> {
            return Long.valueOf(((Duration) obj2).toMillis());
        }).orElse(null), Contexts.with(context).getHttpRequestProgressReporter(), false);
    }

    private Mono<HttpResponse> attemptAsync(HttpRequest httpRequest, boolean z, boolean z2, boolean z3, Long l, ProgressReporter progressReporter, boolean z4) {
        Flux responseConnection = ((HttpClient.RequestSender) this.nettyClient.request(toReactorNettyHttpMethod(httpRequest.getHttpMethod())).uri(httpRequest.getUrl().toString())).send(bodySendDelegate(httpRequest)).responseConnection(responseDelegate(httpRequest, this.disableBufferCopy, z, z2, z3));
        if (l != null || progressReporter != null) {
            responseConnection = responseConnection.contextWrite(context -> {
                return context.put(AzureNettyHttpClientContext.KEY, new AzureNettyHttpClientContext(l, progressReporter));
            });
        }
        return responseConnection.single().flatMap(tuple2 -> {
            HttpResponse httpResponse = (HttpResponse) tuple2.getT1();
            return (this.addProxyHandler && httpResponse.getStatusCode() == 407) ? z4 ? Mono.error(new HttpProxyHandler.HttpProxyConnectException("Failed to connect to proxy. Status: 407", (HttpHeaders) tuple2.getT2())) : attemptAsync(httpRequest, z, z2, z3, l, progressReporter, true) : Mono.just(httpResponse);
        }).onErrorResume(th -> {
            return shouldRetryProxyError(z4, th) ? attemptAsync(httpRequest, z, z2, z3, l, progressReporter, true) : Mono.error(th);
        });
    }

    private static boolean shouldRetryProxyError(boolean z, Throwable th) {
        return !z && ((th instanceof ProxyConnectException) || ((th instanceof SSLException) && (th.getCause() instanceof ProxyConnectException)));
    }

    @Override // kusto_connector_shaded.com.azure.core.http.HttpClient
    public HttpResponse sendSync(HttpRequest httpRequest, Context context) {
        try {
            return send(httpRequest, context).block();
        } catch (Exception e) {
            Throwable unwrap = Exceptions.unwrap(e);
            if (unwrap instanceof RuntimeException) {
                throw LOGGER.logExceptionAsError((RuntimeException) unwrap);
            }
            if (unwrap instanceof IOException) {
                throw LOGGER.logExceptionAsError(new UncheckedIOException((IOException) unwrap));
            }
            throw LOGGER.logExceptionAsError(new RuntimeException(unwrap));
        }
    }

    private static BiFunction<HttpClientRequest, NettyOutbound, Publisher<Void>> bodySendDelegate(HttpRequest httpRequest) {
        return (httpClientRequest, nettyOutbound) -> {
            Iterator<HttpHeader> it = httpRequest.getHeaders().iterator();
            while (it.hasNext()) {
                HttpHeader next = it.next();
                httpClientRequest.requestHeaders().set(next.getName(), (Iterable<?>) next.getValuesList());
            }
            BinaryData bodyAsBinaryData = httpRequest.getBodyAsBinaryData();
            if (bodyAsBinaryData == null) {
                return nettyOutbound;
            }
            BinaryDataContent content = BinaryDataHelper.getContent(bodyAsBinaryData);
            return content instanceof ByteArrayContent ? nettyOutbound.send(Mono.just(Unpooled.wrappedBuffer(content.toBytes()))) : ((content instanceof StringContent) || (content instanceof SerializableContent)) ? nettyOutbound.send(Mono.fromSupplier(() -> {
                return Unpooled.wrappedBuffer(content.toBytes());
            })) : content instanceof FileContent ? sendFile(httpRequest, nettyOutbound, (FileContent) content) : content instanceof InputStreamContent ? sendInputStream(nettyOutbound, (InputStreamContent) content) : nettyOutbound.send(httpRequest.getBody().map(Unpooled::wrappedBuffer));
        };
    }

    private static NettyOutbound sendFile(HttpRequest httpRequest, NettyOutbound nettyOutbound, FileContent fileContent) {
        return fileContent.getLength().longValue() == 0 ? nettyOutbound.sendByteArray(Flux.just(EMPTY_BYTES)) : httpRequest.getUrl().getProtocol().equals("https") ? nettyOutbound.sendUsing(() -> {
            return FileChannel.open(fileContent.getFile(), StandardOpenOption.READ);
        }, (connection, fileChannel) -> {
            if (connection.channel().pipeline().get(ChunkedWriteHandler.class) == null) {
                connection.addHandlerLast(NettyPipeline.ChunkedWriter, new ChunkedWriteHandler());
            }
            try {
                return new ChunkedNioFile(fileChannel, fileContent.getPosition(), fileContent.getLength().longValue(), fileContent.getChunkSize());
            } catch (IOException e) {
                throw LOGGER.logExceptionAsError(new UncheckedIOException(e));
            }
        }, fileChannel2 -> {
            try {
                fileChannel2.close();
            } catch (IOException e) {
                throw LOGGER.logExceptionAsError(new UncheckedIOException(e));
            }
        }) : nettyOutbound.sendFile(fileContent.getFile(), fileContent.getPosition(), fileContent.getLength().longValue());
    }

    private static NettyOutbound sendInputStream(NettyOutbound nettyOutbound, InputStreamContent inputStreamContent) {
        Objects.requireNonNull(inputStreamContent);
        return nettyOutbound.sendUsing(inputStreamContent::toStream, (connection, inputStream) -> {
            if (connection.channel().pipeline().get(ChunkedWriteHandler.class) == null) {
                connection.addHandlerLast(NettyPipeline.ChunkedWriter, new ChunkedWriteHandler());
            }
            return new ChunkedStream(inputStream);
        }, inputStream2 -> {
        });
    }

    private static BiFunction<HttpClientResponse, Connection, Mono<Tuple2<HttpResponse, HttpHeaders>>> responseDelegate(HttpRequest httpRequest, boolean z, boolean z2, boolean z3, boolean z4) {
        return (httpClientResponse, connection) -> {
            return (z2 || z3) ? Mono.using(() -> {
                return connection;
            }, connection -> {
                return connection.inbound().receive().aggregate().asByteArray().switchIfEmpty(Mono.just(EMPTY_BYTES)).map(bArr -> {
                    return Tuples.of(new NettyAsyncHttpBufferedResponse(httpClientResponse, httpRequest, bArr, z4), httpClientResponse.responseHeaders());
                });
            }, Utility::closeConnection) : Mono.just(Tuples.of(new NettyAsyncHttpResponse(httpClientResponse, connection, httpRequest, z, z4), httpClientResponse.responseHeaders()));
        };
    }

    private static HttpMethod toReactorNettyHttpMethod(kusto_connector_shaded.com.azure.core.http.HttpMethod httpMethod) {
        switch (httpMethod) {
            case GET:
                return HttpMethod.GET;
            case PUT:
                return HttpMethod.PUT;
            case HEAD:
                return HttpMethod.HEAD;
            case POST:
                return HttpMethod.POST;
            case DELETE:
                return HttpMethod.DELETE;
            case PATCH:
                return HttpMethod.PATCH;
            case TRACE:
                return HttpMethod.TRACE;
            case CONNECT:
                return HttpMethod.CONNECT;
            case OPTIONS:
                return HttpMethod.OPTIONS;
            default:
                throw LOGGER.logExceptionAsError(new IllegalStateException("Unknown HttpMethod '" + httpMethod + "'."));
        }
    }
}
