/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp.queue;

import io.activej.async.exception.AsyncCloseException;
import io.activej.common.Checks;
import io.activej.common.recycle.Recyclers;
import io.activej.csp.queue.ChannelQueue;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import io.activej.reactor.ImplicitlyReactive;
import io.activej.reactor.Reactive;
import java.util.function.Supplier;
import org.jetbrains.annotations.Nullable;

public final class ChannelBufferWithFallback<T>
extends ImplicitlyReactive
implements ChannelQueue<T> {
    private static final boolean CHECKS = Checks.isEnabled(ChannelBufferWithFallback.class);
    private final ChannelQueue<T> queue;
    private final Supplier<Promise<? extends ChannelQueue<T>>> bufferFactory;
    @Nullable
    private ChannelQueue<T> buffer;
    @Nullable
    private Exception exception;
    private SettablePromise<Void> waitingForBuffer;
    private boolean finished = false;

    public ChannelBufferWithFallback(ChannelQueue<T> queue, Supplier<Promise<? extends ChannelQueue<T>>> bufferFactory) {
        this.queue = queue;
        this.bufferFactory = bufferFactory;
    }

    @Override
    public Promise<Void> put(@Nullable T item) {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        if (this.exception != null) {
            Recyclers.recycle(item);
            return Promise.ofException((Exception)this.exception);
        }
        return this.doPut(item);
    }

    @Override
    public Promise<T> take() {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        if (this.exception != null) {
            return Promise.ofException((Exception)this.exception);
        }
        return this.doTake();
    }

    private Promise<Void> doPut(@Nullable T item) {
        SettablePromise waitingForBuffer;
        if (item == null) {
            this.finished = true;
        }
        if (this.buffer != null) {
            return this.secondaryPut(item);
        }
        if (this.waitingForBuffer != null) {
            return this.waitingForBuffer.then($ -> this.secondaryPut(item));
        }
        if (!this.queue.isSaturated()) {
            return this.queue.put(item);
        }
        this.waitingForBuffer = waitingForBuffer = new SettablePromise();
        return this.bufferFactory.get().then(buffer -> {
            this.buffer = buffer;
            waitingForBuffer.set(null);
            this.waitingForBuffer = null;
            return this.secondaryPut(item);
        });
    }

    private Promise<T> doTake() {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        if (this.buffer != null) {
            return this.secondaryTake();
        }
        if (this.waitingForBuffer != null) {
            return this.waitingForBuffer.then($ -> this.secondaryTake());
        }
        if (this.finished && this.queue.isExhausted()) {
            return Promise.of(null);
        }
        return this.queue.take();
    }

    private Promise<Void> secondaryPut(@Nullable T item) {
        assert (this.buffer != null);
        return this.buffer.put(item).then(Promise::of, e -> {
            if (!(e instanceof AsyncCloseException)) {
                return Promise.ofException((Exception)e);
            }
            this.buffer = null;
            return this.doPut(item);
        });
    }

    private Promise<T> secondaryTake() {
        if (this.buffer == null) {
            return this.doTake();
        }
        return this.buffer.take().then((item, e) -> {
            if (e != null) {
                if (!(e instanceof AsyncCloseException)) {
                    return Promise.ofException((Exception)e);
                }
            } else {
                if (item != null) {
                    return Promise.of((Object)item);
                }
                this.buffer.close();
            }
            this.buffer = null;
            return this.doTake();
        });
    }

    @Override
    public boolean isSaturated() {
        return this.queue.isSaturated() && this.buffer != null && this.buffer.isSaturated();
    }

    @Override
    public boolean isExhausted() {
        return this.queue.isExhausted() && (this.buffer == null || this.buffer.isExhausted());
    }

    public void closeEx(Exception e) {
        Reactive.checkInReactorThread((Reactive)this);
        if (this.exception != null) {
            return;
        }
        this.exception = e;
        this.queue.closeEx(e);
        if (this.waitingForBuffer != null) {
            this.waitingForBuffer.whenResult(() -> {
                assert (this.buffer != null);
                this.buffer.closeEx(e);
            });
        }
        if (this.buffer != null) {
            this.buffer.closeEx(e);
        }
    }

    @Nullable
    public Exception getException() {
        return this.exception;
    }
}

