package ru.tinkoff.kora.http.client.async;

import io.netty.handler.codec.http.HttpHeaders;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseStatus;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.http.client.async.response.EmptyAsyncHttpClientResponse;
import ru.tinkoff.kora.http.client.async.response.QueuePublisher;
import ru.tinkoff.kora.http.client.async.response.SingleBufferAsyncHttpClientResponse;
import ru.tinkoff.kora.http.client.async.response.StreamingAsyncHttpClientResponse;
import ru.tinkoff.kora.http.client.common.response.HttpClientResponse;

/* loaded from: input_file:ru/tinkoff/kora/http/client/async/MonoSinkStreamAsyncHandler.class */
class MonoSinkStreamAsyncHandler implements AsyncHandler<Object> {
    private final CompletableFuture<HttpClientResponse> future;
    private final AtomicReference<RequestPhase> phase = new AtomicReference<>(RequestPhase.REQUESTED);
    private final Context context;
    private HttpResponseStatus responseStatus;
    private HttpHeaders headers;
    private QueuePublisher<ByteBuffer> publisher;

    /* loaded from: input_file:ru/tinkoff/kora/http/client/async/MonoSinkStreamAsyncHandler$RequestPhase.class */
    private enum RequestPhase {
        REQUESTED,
        ERROR,
        BODY_STREAM_RECEIVED
    }

    public MonoSinkStreamAsyncHandler(Context context, CompletableFuture<HttpClientResponse> completableFuture) {
        this.future = completableFuture;
        this.context = context;
    }

    public AsyncHandler.State onStatusReceived(HttpResponseStatus httpResponseStatus) {
        this.responseStatus = httpResponseStatus;
        return AsyncHandler.State.CONTINUE;
    }

    public AsyncHandler.State onHeadersReceived(HttpHeaders httpHeaders) {
        this.headers = httpHeaders;
        return AsyncHandler.State.CONTINUE;
    }

    public AsyncHandler.State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) {
        if (this.phase.compareAndSet(RequestPhase.REQUESTED, RequestPhase.BODY_STREAM_RECEIVED)) {
            Context current = Context.current();
            try {
                this.context.inject();
                if (httpResponseBodyPart.length() <= 0) {
                    if (httpResponseBodyPart.isLast()) {
                        this.future.complete(new EmptyAsyncHttpClientResponse(this.responseStatus, this.headers));
                    }
                    AsyncHandler.State state = AsyncHandler.State.CONTINUE;
                    current.inject();
                    return state;
                }
                ByteBuffer allocate = ByteBuffer.allocate(httpResponseBodyPart.length());
                allocate.put(httpResponseBodyPart.getBodyByteBuffer());
                allocate.rewind();
                if (httpResponseBodyPart.isLast()) {
                    this.future.complete(new SingleBufferAsyncHttpClientResponse(this.context, this.responseStatus, this.headers, allocate));
                    AsyncHandler.State state2 = AsyncHandler.State.CONTINUE;
                    current.inject();
                    return state2;
                }
                QueuePublisher<ByteBuffer> queuePublisher = new QueuePublisher<>();
                this.publisher = queuePublisher;
                queuePublisher.next(allocate);
                this.future.complete(new StreamingAsyncHttpClientResponse(this.responseStatus, this.headers, queuePublisher));
                current.inject();
            } catch (Throwable th) {
                current.inject();
                throw th;
            }
        } else {
            if (httpResponseBodyPart.length() > 0) {
                this.publisher.next(httpResponseBodyPart.getBodyByteBuffer());
            }
            if (httpResponseBodyPart.isLast()) {
                this.publisher.complete();
            }
        }
        return AsyncHandler.State.CONTINUE;
    }

    public void onThrowable(Throwable th) {
        if (!this.phase.compareAndSet(RequestPhase.REQUESTED, RequestPhase.ERROR)) {
            this.publisher.error(th);
            return;
        }
        Context current = Context.current();
        try {
            this.context.inject();
            this.future.completeExceptionally(th);
        } finally {
            current.inject();
        }
    }

    public Object onCompleted() {
        if (!this.phase.compareAndSet(RequestPhase.REQUESTED, RequestPhase.BODY_STREAM_RECEIVED)) {
            return null;
        }
        Context current = Context.current();
        try {
            this.context.inject();
            this.future.complete(new EmptyAsyncHttpClientResponse(this.responseStatus, this.headers));
            return null;
        } finally {
            current.inject();
        }
    }
}
