/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.reactivestreams;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.ActorProxy;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Scheduler;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.impl.SimpleScheduler;
import org.nustaq.kontraktor.reactivestreams.KxPublisher;
import org.nustaq.kontraktor.reactivestreams.impl.KxPublisherActor;
import org.nustaq.kontraktor.reactivestreams.impl.KxSubscriber;
import org.nustaq.kontraktor.reactivestreams.impl.SyncProcessor;
import org.nustaq.kontraktor.remoting.base.ActorClientConnector;
import org.nustaq.kontraktor.remoting.base.ActorPublisher;
import org.nustaq.kontraktor.remoting.base.ConnectableActor;
import org.nustaq.kontraktor.util.Log;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class KxReactiveStreams
extends Actors {
    public static final int MAX_BATCH_SIZE = 50000;
    public static final int DEFQSIZE = 128000;
    public static final int DEFBATCHSIZE = 50000;
    public static int REQU_NEXT_DIVISOR = 1;
    protected static KxReactiveStreams instance = new KxReactiveStreams(true);
    protected int batchSize = 50000;
    SimpleScheduler scheduler;

    public static KxReactiveStreams get() {
        return instance;
    }

    public KxReactiveStreams() {
        this(false);
    }

    public KxReactiveStreams(boolean keepSchedulerAlive) {
        this(50000, 128000, keepSchedulerAlive);
    }

    public KxReactiveStreams(int batchSize, int queueSize, boolean keepSchedulerAlive) {
        if (batchSize * 2 > queueSize) {
            throw new RuntimeException("queuesize must be >= 2 * batchSize");
        }
        this.scheduler = new SimpleScheduler(queueSize, keepSchedulerAlive);
        this.batchSize = batchSize;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void terminateScheduler() {
        this.scheduler.setKeepAlive(false);
    }

    public <T> KxPublisher<T> asKxPublisher(final Publisher<T> p) {
        if (p instanceof KxPublisher) {
            return (KxPublisher)p;
        }
        return new KxPublisher<T>(){

            @Override
            public void subscribe(Subscriber<? super T> s) {
                p.subscribe(s);
            }

            @Override
            @CallerSideMethod
            public KxReactiveStreams getKxStreamsInstance() {
                return KxReactiveStreams.this;
            }
        };
    }

    public <T> Subscriber<T> subscriber(Callback<T> cb) {
        return this.subscriber(this.batchSize, cb);
    }

    public <T> Subscriber<T> subscriber(int batchSize, Callback<T> cb) {
        if (batchSize > 50000) {
            throw new RuntimeException("batch size exceeds maximum of 50000");
        }
        return new KxSubscriber<T>(batchSize, cb);
    }

    public <T> KxPublisher<T> connect(Class<T> eventType, ConnectableActor connectable) {
        return (KxPublisher)this.connect(eventType, connectable, null).await();
    }

    public <T> KxPublisher<T> produce(Stream<T> stream) {
        return this.produce(this.batchSize, stream.iterator());
    }

    public <T> KxPublisher<T> produce(Collection<T> collection) {
        return this.produce(this.batchSize, collection.iterator());
    }

    public KxPublisher<Integer> produce(IntStream stream) {
        return this.produce(this.batchSize, stream.mapToObj(i -> i).iterator());
    }

    public KxPublisher<Long> produce(LongStream stream) {
        return this.produce(this.batchSize, stream.mapToObj(i -> i).iterator());
    }

    public KxPublisher<Double> produce(DoubleStream stream) {
        return this.produce(this.batchSize, stream.mapToObj(i -> i).iterator());
    }

    public <T> KxPublisher<T> produce(int batchSize, Stream<T> stream) {
        return this.produce(batchSize, stream.iterator());
    }

    public <T> KxPublisher<T> produce(Iterator<T> iter) {
        return this.produce(this.batchSize, iter);
    }

    public <T> KxPublisher<T> produce(int batchSize, Iterator<T> iter) {
        if (batchSize > 50000) {
            throw new RuntimeException("batch size exceeds max of 50000");
        }
        KxPublisherActor pub = (KxPublisherActor)Actors.AsActor(KxPublisherActor.class, (Scheduler)this.scheduler);
        pub._streams = this;
        ((KxPublisherActor)pub.getActor())._streams = this;
        pub.setBatchSize(batchSize);
        pub.initFromIterator(iter);
        return pub;
    }

    public <T> Stream<T> stream(Publisher<T> pub) {
        return this.stream(pub, this.batchSize);
    }

    public <T> Stream<T> stream(Publisher pub, int batchSize) {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(this.iterator(pub, batchSize), 1280), false);
    }

    public <T> Iterator<T> iterator(Publisher<T> pub) {
        return this.iterator(pub, this.batchSize);
    }

    public <T> Iterator<T> iterator(Publisher<T> pub, int batchSize) {
        if (batchSize > 50000) {
            throw new RuntimeException("batch size exceeds max of 50000");
        }
        KxSubscriber subs = new KxSubscriber(batchSize);
        pub.subscribe(subs);
        return subs;
    }

    public <T> IPromise<KxPublisher<T>> connect(Class<T> eventType, ConnectableActor connectable, Callback<ActorClientConnector> disconHandler) {
        Callback & Serializable discon = (Callback & Serializable)(acc, err) -> {
            Log.Info((Object)((Object)this), (String)"Client disconnected");
            acc.closeClient();
            if (disconHandler != null) {
                disconHandler.complete(acc, err);
            }
        };
        IPromise connect = connectable.actorClass(KxPublisherActor.class).inboundQueueSize(this.scheduler.getDefaultQSize()).connect((Callback)discon, r -> {
            Actor remoteref = r;
            if (((KxPublisherActor)remoteref)._callerSideSubscribers != null) {
                ((KxPublisherActor)remoteref)._callerSideSubscribers.forEach((Consumer<Subscriber>)((Consumer<Object>)subs -> ((Subscriber)subs).onError((Throwable)new IOException("connection lost"))));
                ((KxPublisherActor)remoteref)._callerSideSubscribers = null;
            }
        });
        Promise res = new Promise();
        connect.then((Callback & Serializable)(publisher, err) -> {
            if (publisher != null) {
                ((KxPublisherActor)publisher)._streams = this;
                res.resolve((Object)((KxPublisher)publisher));
            } else {
                res.reject(err);
            }
        });
        return res;
    }

    public <OUT> IPromise serve(Publisher<OUT> source, ActorPublisher networRxPublisher, boolean closeConnectionOnCompleteOrError, Consumer<Actor> disconCB) {
        if (networRxPublisher.getClass().getSimpleName().equals("HttpPublisher")) {
            throw new RuntimeException("Http long poll cannot be supported. Use WebSockets instead.");
        }
        if (!(source instanceof KxPublisherActor) || !(source instanceof ActorProxy)) {
            Processor<Object, Object> proc = this.newAsyncProcessor(a -> a);
            source.subscribe(proc);
            source = proc;
        }
        ((KxPublisherActor)source).setCloseOnComplete(closeConnectionOnCompleteOrError);
        return networRxPublisher.facade((Actor)source).publish(disconCB);
    }

    public <IN, OUT> Processor<IN, OUT> newAsyncProcessor(Function<IN, OUT> processingFunction) {
        return this.newAsyncProcessor(processingFunction, (Scheduler)this.scheduler, this.batchSize);
    }

    public <IN, OUT> Processor<IN, OUT> newAsyncProcessor(Function<IN, OUT> processingFunction, int batchSize) {
        return this.newAsyncProcessor(processingFunction, (Scheduler)this.scheduler, batchSize);
    }

    public <IN, OUT> Processor<IN, OUT> newAsyncProcessor(Function<IN, OUT> processingFunction, Scheduler sched, int batchSize) {
        if (batchSize > 50000) {
            throw new RuntimeException("batch size exceeds max of 50000");
        }
        KxPublisherActor pub = (KxPublisherActor)Actors.AsActor(KxPublisherActor.class, (Scheduler)sched);
        pub._streams = this;
        ((KxPublisherActor)pub.getActor())._streams = this;
        pub.setBatchSize(batchSize);
        pub.init(processingFunction);
        return pub;
    }

    public <IN, OUT> Processor<IN, OUT> newSyncProcessor(Function<IN, OUT> processingFunction) {
        SyncProcessor<IN, OUT> inoutSyncProcessor = new SyncProcessor<IN, OUT>(this.batchSize, processingFunction, this);
        return inoutSyncProcessor;
    }

    public <T, OUT> Processor<T, OUT> newLossyProcessor(Function<T, OUT> processingFunction, int batchSize) {
        if (batchSize > 50000) {
            throw new RuntimeException("batch size exceeds max of 50000");
        }
        KxPublisherActor pub = (KxPublisherActor)Actors.AsActor(KxPublisherActor.class, (Scheduler)this.scheduler);
        pub._streams = this;
        ((KxPublisherActor)pub.getActor())._streams = this;
        pub.setBatchSize(batchSize);
        pub.setLossy(true);
        pub.init(processingFunction);
        return pub;
    }
}

