/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.reactivestreams;

import java.util.concurrent.atomic.AtomicLong;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.reactivestreams.CancelException;
import org.nustaq.kontraktor.reactivestreams.KxPublisher;
import org.nustaq.kontraktor.reactivestreams.KxReactiveStreams;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class EventSink<T>
implements KxPublisher<T> {
    protected AtomicLong credits = new AtomicLong(0L);
    protected Actor actorSubs;
    protected volatile Subscriber subs;
    protected volatile boolean canceled = false;
    protected KxReactiveStreams streams;

    public EventSink() {
        this(KxReactiveStreams.get());
    }

    public EventSink(KxReactiveStreams streams) {
        this.streams = streams;
    }

    public boolean offer(T event) {
        if (event == null) {
            throw new RuntimeException("event cannot be null");
        }
        if (this.canceled) {
            throw CancelException.Instance;
        }
        if ((this.actorSubs != null && !this.actorSubs.isMailboxPressured() || this.actorSubs == null) && this.credits.get() > 0L && this.subs != null) {
            this.subs.onNext(event);
            this.credits.decrementAndGet();
            return true;
        }
        return false;
    }

    public void complete() {
        this.subs.onComplete();
    }

    public void error(Throwable th) {
        this.subs.onError(th);
    }

    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
        if (this.subs != null) {
            throw new RuntimeException("only one subscription supported");
        }
        if (subscriber == null) {
            throw null;
        }
        this.subs = subscriber;
        if (this.subs instanceof Actor) {
            this.actorSubs = (Actor)this.subs;
        }
        subscriber.onSubscribe(new Subscription(){

            public void request(long l) {
                if (l <= 0L) {
                    EventSink.this.subs.onError((Throwable)new IllegalArgumentException("spec rule 3.9: request > 0 elements"));
                }
                EventSink.this.credits.addAndGet(l);
            }

            public void cancel() {
                EventSink.this.subs = null;
                EventSink.this.canceled = true;
            }
        });
    }

    @Override
    @CallerSideMethod
    public KxReactiveStreams getKxStreamsInstance() {
        if (this.streams == null) {
            System.out.println("POK");
        }
        return this.streams;
    }
}

