/*
 * Decompiled with CFR 0.152.
 */
package infra.http.client;

import infra.http.HttpHeaders;
import infra.http.HttpMethod;
import infra.http.StreamingHttpOutputMessage;
import infra.http.client.AbstractStreamingClientHttpRequest;
import infra.http.client.ClientHttpResponse;
import infra.http.client.OutputStreamPublisher;
import infra.http.client.ReactorClientHttpResponse;
import infra.lang.Nullable;
import infra.util.StreamUtils;
import infra.util.concurrent.Future;
import infra.util.concurrent.Promise;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;

final class ReactorClientHttpRequest
extends AbstractStreamingClientHttpRequest {
    private final HttpClient httpClient;
    private final HttpMethod method;
    private final URI uri;
    @Nullable
    private final Duration exchangeTimeout;

    ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Duration exchangeTimeout) {
        this.httpClient = httpClient;
        this.method = method;
        this.uri = uri;
        this.exchangeTimeout = exchangeTimeout;
    }

    @Override
    public HttpMethod getMethod() {
        return this.method;
    }

    @Override
    public URI getURI() {
        return this.uri;
    }

    @Override
    protected ClientHttpResponse executeInternal(HttpHeaders headers, @Nullable StreamingHttpOutputMessage.Body body) throws IOException {
        HttpClient.RequestSender sender = this.httpClient.request(io.netty.handler.codec.http.HttpMethod.valueOf((String)this.method.name()));
        sender = this.uri.isAbsolute() ? (HttpClient.RequestSender)sender.uri(this.uri) : (HttpClient.RequestSender)sender.uri(this.uri.toString());
        try {
            ReactorClientHttpResponse clientResponse;
            Mono mono = sender.send((request, outbound) -> this.send(headers, body, (HttpClientRequest)request, (NettyOutbound)outbound)).responseConnection((response, conn) -> Mono.just((Object)new ReactorClientHttpResponse((HttpClientResponse)response, (Connection)conn))).next();
            ReactorClientHttpResponse reactorClientHttpResponse = clientResponse = this.exchangeTimeout != null ? (ReactorClientHttpResponse)mono.block(this.exchangeTimeout) : (ReactorClientHttpResponse)mono.block();
            if (clientResponse == null) {
                throw new IOException("HTTP exchange resulted in no result");
            }
            return clientResponse;
        }
        catch (RuntimeException ex) {
            throw ReactorClientHttpRequest.convertException(ex);
        }
    }

    private Publisher<Void> send(HttpHeaders headers, @Nullable StreamingHttpOutputMessage.Body body, HttpClientRequest reactorRequest, NettyOutbound nettyOutbound) {
        io.netty.handler.codec.http.HttpHeaders entries = reactorRequest.requestHeaders();
        for (Map.Entry entry : headers.entrySet()) {
            entries.set((String)entry.getKey(), (Iterable)entry.getValue());
        }
        if (body != null) {
            ByteBufMapper byteMapper = new ByteBufMapper(nettyOutbound.alloc());
            AtomicReference<Object> executor = new AtomicReference<Object>();
            return nettyOutbound.withConnection(connection -> executor.set(connection.channel().eventLoop())).send(FlowAdapters.toPublisher(new OutputStreamPublisher<ByteBuf>(os -> body.writeTo(StreamUtils.nonClosing((OutputStream)os)), byteMapper, executor.getAndSet(null), null)));
        }
        return nettyOutbound;
    }

    static IOException convertException(RuntimeException ex) {
        UncheckedIOException uioEx;
        IOException ioEx;
        Throwable cause = ex.getCause();
        if (cause instanceof IOException) {
            IOException ioEx2 = (IOException)cause;
            return ioEx2;
        }
        if (cause instanceof UncheckedIOException && (ioEx = (uioEx = (UncheckedIOException)cause).getCause()) != null) {
            return ioEx;
        }
        return new IOException(ex.getMessage(), cause != null ? cause : ex);
    }

    @Override
    protected Future<ClientHttpResponse> asyncInternal(HttpHeaders headers, @Nullable StreamingHttpOutputMessage.Body body, @Nullable Executor executor) {
        HttpClient.RequestSender requestSender = this.httpClient.request(io.netty.handler.codec.http.HttpMethod.valueOf((String)this.method.name()));
        requestSender = this.uri.isAbsolute() ? (HttpClient.RequestSender)requestSender.uri(this.uri) : (HttpClient.RequestSender)requestSender.uri(this.uri.toString());
        final Promise promise = Future.forPromise((Executor)executor);
        requestSender.send((request, nettyOutbound) -> this.send(headers, body, (HttpClientRequest)request, (NettyOutbound)nettyOutbound)).responseConnection((reactorResponse, connection) -> Mono.just((Object)new ReactorClientHttpResponse((HttpClientResponse)reactorResponse, (Connection)connection))).next().subscribe((CoreSubscriber)new CoreSubscriber<ReactorClientHttpResponse>(){
            volatile Subscription s;

            public void onSubscribe(Subscription s) {
                s.request(1L);
                promise.onCancelled(() -> ((Subscription)s).cancel());
                this.s = s;
            }

            public void onNext(ReactorClientHttpResponse response) {
                promise.trySuccess((Object)response);
            }

            public void onError(Throwable throwable) {
                promise.tryFailure(throwable);
            }

            public void onComplete() {
            }
        });
        if (this.exchangeTimeout != null) {
            return promise.timeout(this.exchangeTimeout);
        }
        return promise;
    }

    private static final class ByteBufMapper
    implements OutputStreamPublisher.ByteMapper<ByteBuf> {
        private final ByteBufAllocator allocator;

        public ByteBufMapper(ByteBufAllocator allocator) {
            this.allocator = allocator;
        }

        @Override
        public ByteBuf map(int b) {
            ByteBuf byteBuf = this.allocator.buffer(1);
            byteBuf.writeByte(b);
            return byteBuf;
        }

        @Override
        public ByteBuf map(byte[] b, int off, int len) {
            ByteBuf byteBuf = this.allocator.buffer(len);
            byteBuf.writeBytes(b, off, len);
            return byteBuf;
        }
    }
}

