/*
 * Decompiled with CFR 0.152.
 */
package infra.http.client.reactive;

import infra.core.io.buffer.DataBufferFactory;
import infra.core.io.buffer.DefaultDataBufferFactory;
import infra.http.HttpMethod;
import infra.http.client.reactive.ClientHttpConnector;
import infra.http.client.reactive.ClientHttpRequest;
import infra.http.client.reactive.ClientHttpResponse;
import infra.http.client.reactive.HttpComponentsClientHttpRequest;
import infra.http.client.reactive.HttpComponentsClientHttpResponse;
import infra.lang.Assert;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CancellationException;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.hc.client5.http.cookie.BasicCookieStore;
import org.apache.hc.client5.http.cookie.CookieStore;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStreamResetException;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public class HttpComponentsClientHttpConnector
implements ClientHttpConnector,
Closeable {
    private final CloseableHttpAsyncClient client;
    private final BiFunction<HttpMethod, URI, ? extends HttpClientContext> contextProvider;
    private DataBufferFactory dataBufferFactory = DefaultDataBufferFactory.sharedInstance;

    public HttpComponentsClientHttpConnector() {
        this(HttpAsyncClients.createDefault());
    }

    public HttpComponentsClientHttpConnector(CloseableHttpAsyncClient client) {
        this(client, (method, uri) -> HttpClientContext.create());
    }

    public HttpComponentsClientHttpConnector(CloseableHttpAsyncClient client, BiFunction<HttpMethod, URI, ? extends HttpClientContext> contextProvider) {
        Assert.notNull((Object)client, (String)"Client is required");
        Assert.notNull(contextProvider, (String)"ContextProvider is required");
        this.client = client;
        this.contextProvider = contextProvider;
        this.client.start();
    }

    public void setBufferFactory(DataBufferFactory bufferFactory) {
        this.dataBufferFactory = bufferFactory;
    }

    @Override
    public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
        HttpClientContext context = this.contextProvider.apply(method, uri);
        if (context.getCookieStore() == null) {
            context.setCookieStore((CookieStore)new BasicCookieStore());
        }
        HttpComponentsClientHttpRequest request = new HttpComponentsClientHttpRequest(method, uri, context, this.dataBufferFactory);
        return requestCallback.apply(request).then(Mono.defer(() -> this.execute(request, context)));
    }

    private Mono<ClientHttpResponse> execute(HttpComponentsClientHttpRequest request, HttpClientContext context) {
        AsyncRequestProducer requestProducer = request.toRequestProducer();
        return Mono.create(sink -> {
            ReactiveResponseConsumer reactiveResponseConsumer = new ReactiveResponseConsumer((FutureCallback)new ResponseCallback((MonoSink<ClientHttpResponse>)sink, this.dataBufferFactory, context));
            this.client.execute(requestProducer, (AsyncResponseConsumer)reactiveResponseConsumer, (HttpContext)context, (FutureCallback)new ResultCallback((MonoSink<?>)sink));
        });
    }

    @Override
    public void close() throws IOException {
        this.client.close();
    }

    private static class ResponseCallback
    implements FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> {
        private final HttpClientContext context;
        private final MonoSink<ClientHttpResponse> sink;
        private final DataBufferFactory dataBufferFactory;

        public ResponseCallback(MonoSink<ClientHttpResponse> sink, DataBufferFactory dataBufferFactory, HttpClientContext context) {
            this.sink = sink;
            this.dataBufferFactory = dataBufferFactory;
            this.context = context;
        }

        public void completed(Message<HttpResponse, Publisher<ByteBuffer>> result) {
            this.sink.success((Object)new HttpComponentsClientHttpResponse(this.dataBufferFactory, result, this.context));
        }

        public void failed(Exception ex) {
            this.sink.error(ex instanceof HttpStreamResetException && ex.getCause() != null ? ex.getCause() : ex);
        }

        public void cancelled() {
            this.sink.error((Throwable)new CancellationException());
        }
    }

    private static class ResultCallback
    implements FutureCallback<Void> {
        private final MonoSink<?> sink;

        public ResultCallback(MonoSink<?> sink) {
            this.sink = sink;
        }

        public void completed(Void result) {
            this.sink.success();
        }

        public void failed(Exception ex) {
            this.sink.error(ex instanceof HttpStreamResetException && ex.getCause() != null ? ex.getCause() : ex);
        }

        public void cancelled() {
            this.sink.error((Throwable)new CancellationException());
        }
    }
}

