/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.reactivestreams;

import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.reactivestreams.KxReactiveStreams;
import org.nustaq.kontraktor.reactivestreams.impl.KxPublisherActor;
import org.nustaq.kontraktor.reactivestreams.impl.KxSubscriber;
import org.nustaq.kontraktor.remoting.base.ActorPublisher;
import org.nustaq.kontraktor.remoting.base.ActorServer;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public interface KxPublisher<T>
extends Publisher<T> {
    @CallerSideMethod
    default public void subscribe(Callback<T> cb) {
        this.subscribe(this.getKxStreamsInstance().subscriber(cb));
    }

    @CallerSideMethod
    public KxReactiveStreams getKxStreamsInstance();

    @CallerSideMethod
    default public void subscribe(int batchSize, Callback<T> cb) {
        this.subscribe(this.getKxStreamsInstance().subscriber(batchSize, cb));
    }

    @CallerSideMethod
    default public void stream(Consumer<Stream<T>> streamingCode) {
        this.stream(this.getKxStreamsInstance().getBatchSize(), streamingCode);
    }

    @CallerSideMethod
    default public void stream(final int batchSize, final Consumer<Stream<T>> streamingCode) {
        if (Actor.inside()) {
            try {
                Stream stream = this.getKxStreamsInstance().stream(this, batchSize);
                streamingCode.accept(stream);
            }
            catch (Throwable ce) {
                Subscription subscription = KxSubscriber.subsToCancel.get();
                if (subscription != null) {
                    subscription.cancel();
                }
                throw ce;
            }
        } else {
            new Thread("Stream Consumer"){

                @Override
                public void run() {
                    try {
                        Stream stream = KxPublisher.this.getKxStreamsInstance().stream(KxPublisher.this, batchSize);
                        streamingCode.accept(stream);
                    }
                    catch (Throwable ce) {
                        Subscription subscription = KxSubscriber.subsToCancel.get();
                        if (subscription != null) {
                            subscription.cancel();
                        }
                        throw ce;
                    }
                }
            }.start();
        }
    }

    @CallerSideMethod
    default public void iterator(final int batchSize, final Consumer<Iterator<T>> iteratingCode) {
        if (Actor.inside()) {
            try {
                iteratingCode.accept(this.getKxStreamsInstance().iterator(this, batchSize));
            }
            catch (Throwable ce) {
                Subscription subscription = KxSubscriber.subsToCancel.get();
                subscription.cancel();
                throw ce;
            }
        } else {
            new Thread("Iterator Consumer"){

                @Override
                public void run() {
                    try {
                        iteratingCode.accept(KxPublisher.this.getKxStreamsInstance().iterator(KxPublisher.this, batchSize));
                    }
                    catch (Throwable ce) {
                        Subscription subscription = KxSubscriber.subsToCancel.get();
                        subscription.cancel();
                        throw ce;
                    }
                }
            }.start();
        }
    }

    @CallerSideMethod
    default public void iterator(Consumer<Iterator<T>> iteratingCode) {
        this.iterator(this.getKxStreamsInstance().getBatchSize(), iteratingCode);
    }

    @CallerSideMethod
    default public <OUT> KxPublisher<OUT> map(Function<T, OUT> processor) {
        Processor<T, OUT> toutProcessor = this.getKxStreamsInstance().newAsyncProcessor(processor);
        this.subscribe((Subscriber)toutProcessor);
        return (KxPublisher)toutProcessor;
    }

    @CallerSideMethod
    default public <OUT> KxPublisher<OUT> async() {
        return this.map(x -> x);
    }

    @CallerSideMethod
    default public <OUT> KxPublisher<OUT> lossy() {
        return this.lossyMap(x -> x);
    }

    @CallerSideMethod
    default public <OUT> KxPublisher<OUT> lossyMap(Function<T, OUT> processor) {
        return this.lossyMap(processor, this.getKxStreamsInstance().getBatchSize());
    }

    @CallerSideMethod
    default public <OUT> KxPublisher<OUT> lossyMap(Function<T, OUT> processor, int batchSize) {
        Processor<T, OUT> toutProcessor = this.getKxStreamsInstance().newLossyProcessor(processor, batchSize);
        this.subscribe((Subscriber)toutProcessor);
        return (KxPublisher)toutProcessor;
    }

    @CallerSideMethod
    default public <OUT> KxPublisher<OUT> map(Function<T, OUT> processor, int batchSize) {
        Processor<T, OUT> toutProcessor = this.getKxStreamsInstance().newAsyncProcessor(processor, batchSize);
        this.subscribe((Subscriber)toutProcessor);
        return (KxPublisher)toutProcessor;
    }

    @CallerSideMethod
    default public ActorServer serve(ActorPublisher publisher, Consumer<Actor> disconCallback) {
        return (ActorServer)this.getKxStreamsInstance().serve(this, publisher, true, disconCallback).await();
    }

    @CallerSideMethod
    default public ActorServer serve(ActorPublisher publisher, boolean closeOnDiscon, Consumer<Actor> disconCallback) {
        return (ActorServer)this.getKxStreamsInstance().serve(this, publisher, closeOnDiscon, disconCallback).await();
    }

    @CallerSideMethod
    default public ActorServer serve(ActorPublisher publisher) {
        return this.serve(publisher, true, null);
    }

    @CallerSideMethod
    default public <OUT> KxPublisher<OUT> syncMap(Function<T, OUT> processor) {
        if (this instanceof KxPublisherActor && ((KxPublisherActor)this).isRemote()) {
            return this.map(processor);
        }
        Processor<T, OUT> outkPublisher = this.getKxStreamsInstance().newSyncProcessor(processor);
        this.subscribe((Subscriber)outkPublisher);
        return (KxPublisher)outkPublisher;
    }

    @CallerSideMethod
    default public Actor asActor() {
        if (this instanceof Actor) {
            return (Actor)this;
        }
        return (Actor)this.async();
    }
}

