package ru.tinkoff.kora.http.client.async;

import io.netty.handler.codec.http.HttpHeaders;
import java.util.concurrent.atomic.AtomicReference;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.handler.StreamedAsyncHandler;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.MonoSink;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.http.client.common.response.HttpClientResponse;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:ru/tinkoff/kora/http/client/async/MonoSinkStreamAsyncHandler.class */
public class MonoSinkStreamAsyncHandler implements StreamedAsyncHandler<Object> {
    private final MonoSink<HttpClientResponse> sink;
    private final AtomicReference<RequestPhase> phase = new AtomicReference<>(RequestPhase.REQUESTED);
    private final Context context;
    private volatile HttpResponseStatus responseStatus;
    private volatile HttpHeaders headers;

    /* loaded from: input_file:ru/tinkoff/kora/http/client/async/MonoSinkStreamAsyncHandler$RequestPhase.class */
    private enum RequestPhase {
        REQUESTED,
        ERROR,
        BODY_STREAM_RECEIVED
    }

    public MonoSinkStreamAsyncHandler(Context context, MonoSink<HttpClientResponse> monoSink) {
        this.sink = monoSink;
        this.context = context;
    }

    public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
        if (this.phase.compareAndSet(RequestPhase.REQUESTED, RequestPhase.BODY_STREAM_RECEIVED)) {
            Context current = Context.current();
            try {
                Context.Reactor.current(this.sink.contextView()).inject();
                this.sink.success(new AsyncHttpClientResponse(this.context, this.responseStatus, this.headers, publisher));
            } finally {
                current.inject();
            }
        }
        return AsyncHandler.State.CONTINUE;
    }

    public AsyncHandler.State onStatusReceived(HttpResponseStatus httpResponseStatus) {
        this.responseStatus = httpResponseStatus;
        return AsyncHandler.State.CONTINUE;
    }

    public AsyncHandler.State onHeadersReceived(HttpHeaders httpHeaders) {
        this.headers = httpHeaders;
        return AsyncHandler.State.CONTINUE;
    }

    public AsyncHandler.State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) {
        return AsyncHandler.State.CONTINUE;
    }

    public void onThrowable(Throwable th) {
        if (this.phase.compareAndSet(RequestPhase.REQUESTED, RequestPhase.ERROR)) {
            Context current = Context.current();
            try {
                Context.Reactor.current(this.sink.contextView()).inject();
                this.sink.error(th);
            } finally {
                current.inject();
            }
        }
    }

    public Object onCompleted() {
        if (!this.phase.compareAndSet(RequestPhase.REQUESTED, RequestPhase.BODY_STREAM_RECEIVED)) {
            return null;
        }
        Context current = Context.current();
        try {
            Context.Reactor.current(this.sink.contextView()).inject();
            this.sink.success(new AsyncHttpClientResponse(this.context, this.responseStatus, this.headers, Flux.empty()));
            return null;
        } finally {
            current.inject();
        }
    }
}
