/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.reactivestreams.impl;

import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.annotations.AsCallback;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.reactivestreams.KxPublisher;
import org.nustaq.kontraktor.reactivestreams.KxReactiveStreams;
import org.nustaq.kontraktor.remoting.base.ConnectionRegistry;
import org.nustaq.kontraktor.remoting.base.RemotedActor;
import org.nustaq.kontraktor.util.Log;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class KxPublisherActor<IN, OUT>
extends Actor<KxPublisherActor<IN, OUT>>
implements Processor<IN, OUT>,
KxPublisher<OUT>,
RemotedActor {
    public static final boolean CRED_DEBUG = false;
    protected Map<Integer, SubscriberEntry> subscribers;
    protected int subsIdCount = 1;
    protected Function<IN, OUT> processor;
    protected ArrayList<Runnable> doOnSubscribe = new ArrayList();
    protected ArrayDeque pending;
    protected boolean isIteratorBased = false;
    protected boolean closeOnComplete = false;
    protected Object actorServer;
    protected boolean lossy = false;
    public KxReactiveStreams _streams;
    public ArrayList<Subscriber> _callerSideSubscribers;
    protected Subscription producer;
    protected long batchSize;
    protected long requestNextTrigger;
    protected long openRequested;

    public void init(Function<IN, OUT> processor) {
        this.pending = new ArrayDeque();
        this.processor = processor;
    }

    public void initFromIterator(final Iterator<IN> iterator) {
        this.pending = new ArrayDeque();
        this.isIteratorBased = true;
        final ThreadPoolExecutor iteratorThread = new ThreadPoolExecutor(0, 1, 10L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        this.producer = new Subscription(){
            boolean complete = false;

            public void request(long outern) {
                iteratorThread.execute(() -> {
                    if (this.complete) {
                        return;
                    }
                    long n = outern;
                    try {
                        while (iterator.hasNext() && n-- > 0L) {
                            ((KxPublisherActor)KxPublisherActor.this.self()).onNext(iterator.next());
                        }
                        if (!iterator.hasNext()) {
                            this.complete = true;
                            ((KxPublisherActor)KxPublisherActor.this.self()).onComplete();
                        }
                    }
                    catch (Throwable t) {
                        ((KxPublisherActor)KxPublisherActor.this.self()).onError(t);
                    }
                });
            }

            public void cancel() {
            }
        };
        this.processor = in -> in;
        this.onSubscribe(this.producer);
        Thread.currentThread().setName(Thread.currentThread() + " (rx async stream processor)");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @CallerSideMethod
    public void subscribe(Subscriber<? super OUT> subscriber) {
        if (this.isRemote()) {
            KxPublisherActor kxPublisherActor = this;
            synchronized (kxPublisherActor) {
                if (this._callerSideSubscribers == null) {
                    this._callerSideSubscribers = new ArrayList();
                }
                this._callerSideSubscribers.add(subscriber);
            }
        }
        if (subscriber == null) {
            subscriber.onError((Throwable)new IllegalArgumentException("cannot subscibe null"));
            return;
        }
        this._subscribe((Callback & Serializable)(res, err) -> {
            if (KxPublisherActor.isError((Object)err)) {
                subscriber.onError((Throwable)err);
            } else if (KxPublisherActor.isComplete((Object)err)) {
                subscriber.onComplete();
            } else {
                subscriber.onNext(res);
            }
        }).then(subs -> {
            Log.Info((Object)this, (String)"stream subscribe acknowledged");
            subscriber.onSubscribe((Subscription)subs);
        });
    }

    public IPromise<KSubscription> _subscribe(Callback subscriber) {
        if (this.subscribers == null) {
            this.subscribers = new HashMap<Integer, SubscriberEntry>();
        }
        int id = this.subsIdCount++;
        KSubscription subs = new KSubscription((KxPublisherActor)this.self(), id);
        this.subscribers.put(id, new SubscriberEntry(id, subs, subscriber));
        return new Promise((Object)subs);
    }

    public void _cancel(int id) {
        if (this.doOnSubscribe != null) {
            this.doOnSubscribe.add(() -> this._cancel(id));
        } else {
            this.subscribers.remove(id);
        }
    }

    @AsCallback
    public void _rq(long l, int id) {
        if (this.doOnSubscribe != null) {
            this.doOnSubscribe.add(() -> this._rq(l, id));
        } else {
            SubscriberEntry se = this.getSE(id);
            if (se != null) {
                se.addCredits(l);
            } else {
                Log.Warn((Object)this, (String)("ignored credits " + l + " on id " + id));
            }
            this.emitRequestNext();
        }
    }

    protected SubscriberEntry getSE(Integer i) {
        return this.subscribers.get(i);
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
        this.requestNextTrigger = batchSize / KxReactiveStreams.REQU_NEXT_DIVISOR;
    }

    @CallerSideMethod
    public void onSubscribe(Subscription subscription) {
        if (subscription == null) {
            throw null;
        }
        ((KxPublisherActor)this.self())._onSubscribe(subscription);
    }

    public void _onSubscribe(Subscription subscription) {
        this.producer = subscription;
        ArrayList<Runnable> tmp = this.doOnSubscribe;
        this.doOnSubscribe = null;
        tmp.forEach(runnable -> runnable.run());
    }

    protected void emitRequestNext() {
        if (this.openRequested < this.requestNextTrigger) {
            long minCredits = Long.MAX_VALUE;
            for (Map.Entry<Integer, SubscriberEntry> next : this.subscribers.entrySet()) {
                long credits = next.getValue().credits;
                if (minCredits > credits) {
                    minCredits = credits;
                }
                if (credits >= this.openRequested) continue;
                return;
            }
            if (this.isIteratorBased) {
                if (minCredits > 0L) {
                    long min = Math.min(minCredits, this.batchSize);
                    this.producer.request(min);
                    this.openRequested += min;
                }
            } else {
                this.producer.request(this.batchSize);
                this.openRequested += this.batchSize;
            }
        }
    }

    @CallerSideMethod
    public void onNext(IN in) {
        if (in == null) {
            throw null;
        }
        this._onNext(in);
    }

    public void _onNext(IN in) {
        if (this.subscribers == null) {
            return;
        }
        --this.openRequested;
        try {
            OUT apply = this.processor.apply(in);
            if (apply != null) {
                this.forwardMessage(apply);
            } else {
                this.emitRequestNext();
            }
        }
        catch (Throwable err) {
            err.printStackTrace();
            this.forwardError(err);
        }
    }

    protected void forwardError(Throwable err) {
        if (this.subscribers == null) {
            err.printStackTrace();
            return;
        }
        this.subscribers.forEach((id, entry) -> entry.onError(err));
    }

    protected void forwardMessage(Object msg) {
        if (this.subscribers == null) {
            return;
        }
        if (this.lossy) {
            ArrayList<SubscriberEntry> toRemove = null;
            for (Map.Entry<Integer, SubscriberEntry> next : this.subscribers.entrySet()) {
                SubscriberEntry entry = next.getValue();
                if (entry.getCredits() <= 0L) continue;
                try {
                    entry.onNext(msg);
                }
                catch (Throwable th) {
                    if (toRemove == null) {
                        toRemove = new ArrayList<SubscriberEntry>();
                    }
                    toRemove.add(entry);
                }
            }
            if (toRemove != null) {
                this.removeSubscribers(toRemove);
            }
            if (this.subscribers.size() > 0) {
                this.emitRequestNext();
            }
        } else {
            long minCredits = this.calcMinCredits();
            if (minCredits <= 0L) {
                this.pending.addFirst(msg);
                return;
            }
            ArrayList<SubscriberEntry> toRemove = null;
            if (this.pending.size() > 0) {
                this.pending.addFirst(msg);
                toRemove = this.forwardPending(minCredits, toRemove);
            } else {
                for (Map.Entry<Integer, SubscriberEntry> next : this.subscribers.entrySet()) {
                    SubscriberEntry entry = next.getValue();
                    try {
                        entry.onNext(msg);
                    }
                    catch (Throwable th) {
                        if (toRemove == null) {
                            toRemove = new ArrayList<SubscriberEntry>();
                        }
                        toRemove.add(entry);
                    }
                }
                --minCredits;
            }
            if (toRemove != null) {
                this.removeSubscribers(toRemove);
            }
            if (this.subscribers.size() > 0) {
                this.emitRequestNext();
            } else {
                if (this.openRequested > 0L || this.pending.size() > 0) {
                    Log.Info((Object)this, (String)("no subscribers, deleting " + this.pending.size() + " messages"));
                    this.pending.clear();
                }
                this.openRequested = 0L;
            }
        }
    }

    protected long calcMinCredits() {
        long minCredits = Long.MAX_VALUE;
        for (Map.Entry<Integer, SubscriberEntry> next : this.subscribers.entrySet()) {
            SubscriberEntry entry = next.getValue();
            if (minCredits <= entry.getCredits()) continue;
            minCredits = entry.getCredits();
        }
        return minCredits;
    }

    protected void removeSubscribers(List toRemove) {
        toRemove.forEach(e -> {
            int subsId = ((SubscriberEntry)e).getSubsId();
            try {
                this._cancel(subsId);
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            this.subscribers.remove(subsId);
            this.subscriberDisconnected(subsId);
        });
    }

    protected List forwardPending(long minCredits, List toRemove) {
        while (this.pending.size() > 0 && minCredits > 0L) {
            Object toSend = this.pending.removeLast();
            for (Map.Entry<Integer, SubscriberEntry> next : this.subscribers.entrySet()) {
                SubscriberEntry entry = next.getValue();
                try {
                    entry.onNext(toSend);
                }
                catch (Throwable th) {
                    if (toRemove == null) {
                        toRemove = new ArrayList<SubscriberEntry>();
                    }
                    toRemove.add(entry);
                }
            }
            --minCredits;
        }
        return toRemove;
    }

    @CallerSideMethod
    public void onError(Throwable throwable) {
        if (throwable == null) {
            throw null;
        }
        this._onError(throwable);
    }

    public void _onError(Throwable throwable) {
        this.forwardError(throwable);
        this.stop();
    }

    public void onComplete() {
        if (this.pending.size() > 0) {
            List l;
            if (this.pending.size() > 0 && (l = this.forwardPending(this.calcMinCredits(), null)) != null && l.size() > 0) {
                this.removeSubscribers(l);
            }
            this.delayed(1L, () -> ((KxPublisherActor)this.self()).onComplete());
        } else {
            this.subscribers.forEach((id, entry) -> entry.onComplete());
            this.stop();
        }
    }

    public void hasBeenUnpublished(String connectionIdentifier) {
        Iterator<Map.Entry<Integer, SubscriberEntry>> iterator = this.subscribers.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Integer, SubscriberEntry> entry = iterator.next();
            Callback subscriber = entry.getValue().subscriber;
            if (!(subscriber instanceof CallbackWrapper) || !((CallbackWrapper)subscriber).isTerminated()) continue;
            iterator.remove();
            this.subscriberDisconnected(entry.getKey());
        }
    }

    public void stop() {
        if (this.isPublished() && this.closeOnComplete) {
            ConcurrentLinkedQueue connections = this.getConnections();
            this.close();
            Iterator iterator = connections.iterator();
            while (iterator.hasNext()) {
                ((ConnectionRegistry)iterator.next()).closeNetwork();
            }
        }
        super.stop();
    }

    public void subscriberDisconnected(int id) {
        Log.Info((Object)this, (String)("a stream client disconnected id:" + id + " remaining:" + this.subscribers.size()));
        this.emitRequestNext();
    }

    public void setCloseOnComplete(boolean closeOnComplete) {
        this.closeOnComplete = closeOnComplete;
    }

    public void setLossy(boolean lossy) {
        this.lossy = lossy;
    }

    @Override
    @CallerSideMethod
    public KxReactiveStreams getKxStreamsInstance() {
        if (this._streams == null) {
            KxReactiveStreams streams = ((KxPublisherActor)this.getActor())._streams;
            if (streams == null) {
                System.out.println("POK");
            }
            return streams;
        }
        if (this._streams == null) {
            System.out.println("POK");
        }
        return this._streams;
    }

    protected static class SubscriberEntry {
        protected int subsId;
        protected long credits;
        protected KSubscription subscription;
        protected Callback subscriber;

        public SubscriberEntry(int subsId, KSubscription subscription, Callback subscriber) {
            this.subsId = subsId;
            this.subscription = subscription;
            this.subscriber = subscriber;
        }

        public void addCredits(long l) {
            this.credits += l;
        }

        public int getSubsId() {
            return this.subsId;
        }

        public long getCredits() {
            return this.credits;
        }

        public KSubscription getSubscription() {
            return this.subscription;
        }

        public void onError(Throwable err) {
            this.subscriber.reject((Object)err);
        }

        public void onNext(Object msg) {
            this.subscriber.pipe(msg);
            --this.credits;
        }

        public void onComplete() {
            this.subscriber.finish();
        }
    }

    protected static class KSubscription
    implements Subscription,
    Serializable {
        protected KxPublisherActor publisher;
        protected int id;

        public KSubscription(KxPublisherActor publisher, int id) {
            this.publisher = publisher;
            this.id = id;
        }

        public int getId() {
            return this.id;
        }

        public void request(long l) {
            if (l <= 0L) {
                this.publisher.onError(new IllegalArgumentException("spec rule 3.9: request > 0 elements"));
                return;
            }
            this.publisher._rq(l, this.id);
        }

        public void cancel() {
            this.removeRegistration();
            this.publisher._cancel(this.id);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void removeRegistration() {
            if (this.publisher._callerSideSubscribers != null) {
                ArrayList<Subscriber> arrayList = this.publisher._callerSideSubscribers;
                synchronized (arrayList) {
                    this.publisher._callerSideSubscribers.remove(this);
                }
            }
        }
    }
}

