package ru.tinkoff.kora.http.client.async;

import io.netty.handler.codec.http.HttpHeaders;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseStatus;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.http.client.common.response.HttpClientResponse;

/* loaded from: input_file:ru/tinkoff/kora/http/client/async/AsyncHttpClientResponse.class */
public class AsyncHttpClientResponse implements HttpClientResponse {
    private final HttpResponseStatus responseStatus;
    private final HttpHeaders headers;
    private final Publisher<HttpResponseBodyPart> body;
    private final AtomicReference<BodyState> bodyState = new AtomicReference<>(BodyState.NON_SUBSCRIBED);
    private final Context ctx;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ru/tinkoff/kora/http/client/async/AsyncHttpClientResponse$BodyState.class */
    public enum BodyState {
        NON_SUBSCRIBED,
        SUBSCRIBED,
        BYTES_RECEIVED,
        COMPLETED,
        ERROR
    }

    public AsyncHttpClientResponse(Context context, HttpResponseStatus httpResponseStatus, HttpHeaders httpHeaders, Publisher<HttpResponseBodyPart> publisher) {
        this.responseStatus = httpResponseStatus;
        this.headers = httpHeaders;
        this.body = publisher;
        this.ctx = context;
    }

    public int code() {
        return this.responseStatus.getStatusCode();
    }

    public ru.tinkoff.kora.http.common.HttpHeaders headers() {
        return new AsyncHttpClientHeaders(this.headers);
    }

    public Flux<ByteBuffer> body() {
        return Flux.create(fluxSink -> {
            if (this.bodyState.compareAndSet(BodyState.NON_SUBSCRIBED, BodyState.SUBSCRIBED)) {
                this.body.subscribe(new Subscriber<HttpResponseBodyPart>() { // from class: ru.tinkoff.kora.http.client.async.AsyncHttpClientResponse.1
                    public void onSubscribe(Subscription subscription) {
                        FluxSink fluxSink = fluxSink;
                        Objects.requireNonNull(subscription);
                        fluxSink.onRequest(subscription::request);
                        FluxSink fluxSink2 = fluxSink;
                        Objects.requireNonNull(subscription);
                        fluxSink2.onCancel(subscription::cancel);
                    }

                    public void onNext(HttpResponseBodyPart httpResponseBodyPart) {
                        Context current = Context.current();
                        AsyncHttpClientResponse.this.bodyState.set(BodyState.BYTES_RECEIVED);
                        try {
                            Context.Reactor.current(fluxSink.contextView()).inject();
                            fluxSink.next(httpResponseBodyPart.getBodyByteBuffer());
                        } finally {
                            current.inject();
                        }
                    }

                    public void onError(Throwable th) {
                        Context current = Context.current();
                        AsyncHttpClientResponse.this.bodyState.set(BodyState.ERROR);
                        try {
                            Context.Reactor.current(fluxSink.contextView()).inject();
                            fluxSink.error(th);
                        } finally {
                            current.inject();
                        }
                    }

                    public void onComplete() {
                        Context current = Context.current();
                        try {
                            Context.Reactor.current(fluxSink.contextView()).inject();
                            if (AsyncHttpClientResponse.this.bodyState.compareAndSet(BodyState.SUBSCRIBED, BodyState.COMPLETED)) {
                                fluxSink.next(ByteBuffer.allocate(0));
                            } else {
                                AsyncHttpClientResponse.this.bodyState.set(BodyState.COMPLETED);
                            }
                            fluxSink.complete();
                        } finally {
                            current.inject();
                        }
                    }
                });
            } else {
                fluxSink.error(new IOException("Body was already subscribed"));
            }
        }).contextWrite(context -> {
            return Context.Reactor.inject(context, this.ctx);
        });
    }

    public Mono<Void> close() {
        return Mono.fromRunnable(() -> {
            if (this.bodyState.compareAndSet(BodyState.NON_SUBSCRIBED, BodyState.COMPLETED)) {
                Flux.from(this.body).subscribe();
            }
        });
    }
}
