package net.cassite.f.stream;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.function.Function;
import net.cassite.f.F;
import net.cassite.f.Monad;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:net/cassite/f/stream/Stream.class */
public class Stream<T> implements Publisher<T>, Subscriber<T> {
    private final LinkedHashSet<Handler<AsyncResult<T>>> handlers = new LinkedHashSet<>();
    private LinkedList<Runnable> closeCallbacks = new LinkedList<>();
    private boolean closed = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public void emit(T t) {
        Monad unit = t == null ? F.unit() : F.unit(t);
        Iterator<Handler<AsyncResult<T>>> it = this.handlers.iterator();
        while (it.hasNext()) {
            Handler<AsyncResult<T>> next = it.next();
            try {
                next.handle(unit);
            } catch (Throwable th) {
                try {
                    next.handle(F.fail(th));
                } catch (Throwable th2) {
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void fail(Throwable th) {
        Iterator<Handler<AsyncResult<T>>> it = this.handlers.iterator();
        while (it.hasNext()) {
            try {
                it.next().handle(F.fail(th));
            } catch (Throwable th2) {
            }
        }
    }

    private void closeCheck() {
        if (this.closed) {
            throw new IllegalStateException("the stream is already closed");
        }
    }

    @Override // net.cassite.f.stream.Subscriber, net.cassite.f.core.MonadLike
    /* renamed from: map */
    public <U> Stream<U> m17map(@NotNull Function<T, U> function) {
        if (function == null) {
            throw new NullPointerException();
        }
        closeCheck();
        return m9compose((Function) obj -> {
            return F.unit(function.apply(obj));
        });
    }

    @Override // net.cassite.f.stream.Subscriber, net.cassite.f.core.MonadLike
    /* renamed from: compose */
    public <U> Stream<U> m9compose(@NotNull Function<T, Future<U>> function) {
        if (function == null) {
            throw new NullPointerException();
        }
        closeCheck();
        Stream<U> stream = new Stream<>();
        addHandler(asyncResult -> {
            if (asyncResult.failed()) {
                stream.fail(asyncResult.cause());
                return;
            }
            Future future = (Future) function.apply(asyncResult.result());
            if (future == null) {
                throw new NullPointerException();
            }
            future.setHandler(asyncResult -> {
                if (asyncResult.failed()) {
                    stream.fail(asyncResult.cause());
                } else {
                    stream.emit(asyncResult.result());
                }
            });
        }, stream);
        return stream;
    }

    private <U> void addHandler(Handler<AsyncResult<T>> handler, Stream<U> stream) {
        this.handlers.add(handler);
        if (stream != null) {
            stream.closeCallbacks.add(() -> {
                this.handlers.remove(handler);
            });
        }
    }

    @Override // net.cassite.f.stream.Subscriber, net.cassite.f.core.MonadLike
    /* renamed from: setHandler */
    public Stream<T> m11setHandler(@NotNull Handler<AsyncResult<T>> handler) {
        if (handler == null) {
            throw new NullPointerException();
        }
        closeCheck();
        addHandler(handler, null);
        return this;
    }

    @Override // net.cassite.f.stream.ReactiveCloseable
    public void close() {
        this.closed = true;
        fail(new HandlerRemovedException());
        this.handlers.clear();
        Iterator<Runnable> it = this.closeCallbacks.iterator();
        while (it.hasNext()) {
            it.next().run();
        }
        this.closeCallbacks.clear();
    }

    @Override // net.cassite.f.stream.Publisher
    public Stream<T> subscribe() {
        closeCheck();
        Stream stream = (Stream<T>) new Stream();
        addHandler(asyncResult -> {
            if (asyncResult.failed()) {
                stream.fail(asyncResult.cause());
            } else {
                stream.emit(asyncResult.result());
            }
        }, stream);
        return stream;
    }

    @Override // net.cassite.f.stream.ReactiveCloseable
    public boolean isClosed() {
        return this.closed;
    }

    @Override // net.cassite.f.stream.ReactiveCloseable
    public void addCloseHandler(Runnable runnable) {
        this.closeCallbacks.add(runnable);
    }
}
