/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.http.client.async;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.asynchttpclient.request.body.Body;
import org.asynchttpclient.request.body.generator.FeedListener;
import org.asynchttpclient.request.body.generator.FeedableBodyGenerator;
import ru.tinkoff.kora.http.client.common.HttpClientEncoderException;
import ru.tinkoff.kora.http.common.body.HttpBodyOutput;

public class AsyncHttpClientRequestBodyGenerator
implements FeedableBodyGenerator {
    private final HttpBodyOutput body;
    private FeedListener listener;

    public AsyncHttpClientRequestBodyGenerator(HttpBodyOutput body) {
        this.body = body;
    }

    public boolean feed(ByteBuf buffer, boolean isLast) throws Exception {
        throw new IllegalStateException("Never should be called");
    }

    public void setListener(FeedListener listener) {
        this.listener = listener;
    }

    public Body createBody() {
        return new PublisherBody(this);
    }

    private static class PublisherBody
    extends AtomicBoolean
    implements Body,
    Flow.Subscriber<ByteBuffer> {
        private final HttpBodyOutput body;
        private final Deque<Signal> queue = new ConcurrentLinkedDeque<Signal>();
        private final AsyncHttpClientRequestBodyGenerator generator;
        private volatile Flow.Subscription subscription;
        private static final AtomicIntegerFieldUpdater<PublisherBody> WIP = AtomicIntegerFieldUpdater.newUpdater(PublisherBody.class, "wip");
        private volatile int wip = 0;

        public PublisherBody(AsyncHttpClientRequestBodyGenerator generator) {
            this.body = generator.body;
            this.generator = generator;
        }

        public long getContentLength() {
            return this.body.contentLength();
        }

        public Body.BodyState transferTo(ByteBuf target) throws IOException {
            if (WIP.compareAndSet(this, 0, 1)) {
                if (this.compareAndSet(false, true)) {
                    this.body.subscribe((Flow.Subscriber)this);
                }
                try {
                    Body.BodyState bodyState = this.readNextChunk(target);
                    return bodyState;
                }
                finally {
                    WIP.set(this, 0);
                }
            }
            return Body.BodyState.SUSPEND;
        }

        public Body.BodyState readNextChunk(ByteBuf target) {
            Body.BodyState res = Body.BodyState.SUSPEND;
            Deque<Signal> q = this.queue;
            while (target.isWritable()) {
                Signal nextChunk = q.peek();
                if (nextChunk == null) {
                    return res;
                }
                if (nextChunk == Signal.LAST) {
                    q.remove();
                    return Body.BodyState.STOP;
                }
                if (nextChunk.error() != null) {
                    q.remove();
                    throw new HttpClientEncoderException(nextChunk.error());
                }
                if (!nextChunk.data().hasRemaining()) {
                    q.remove();
                    this.subscription.request(1L);
                    continue;
                }
                res = Body.BodyState.CONTINUE;
                target.writeBytes(nextChunk.data());
                if (nextChunk.data().hasRemaining()) continue;
                q.remove();
                this.subscription.request(1L);
            }
            return res;
        }

        public void close() throws IOException {
            this.body.close();
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1L);
        }

        @Override
        public void onNext(ByteBuffer item) {
            if (item.hasRemaining()) {
                this.queue.add(new Signal(item, null));
                this.generator.listener.onContentAdded();
            }
        }

        @Override
        public void onError(Throwable throwable) {
            this.generator.listener.onError((Throwable)new HttpClientEncoderException(throwable));
            this.queue.add(new Signal(null, throwable));
        }

        @Override
        public void onComplete() {
            this.queue.addLast(Signal.LAST);
            this.generator.listener.onContentAdded();
        }
    }

    private record Signal(ByteBuffer data, Throwable error) {
        public static final Signal LAST = new Signal(null, null);
    }
}

