/*
 * Decompiled with CFR 0.152.
 */
package de.juplo.reactorm;

import de.juplo.reactorm.PaginatedSourceSubscription;
import de.juplo.reactorm.Request;
import de.juplo.reactorm.SerialExecutor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PaginatedSourcePublisher<T>
implements Publisher<T> {
    public static final Logger LOG = LoggerFactory.getLogger(PaginatedSourcePublisher.class);
    public static final int DEFAULT_MAX = 100;
    public static final int DEFAULT_MIN = 100;
    public static final int DEFAULT_FILL = 20;
    private final Executor executor;
    private final BiFunction<Long, Long, List<T>> function;
    private final int min;
    private final int max;
    private final int fill;
    private volatile boolean completed = false;
    private volatile Throwable error = null;
    private long current = 0L;
    private long start = 0L;
    private boolean publishing;
    private final Map<Subscriber<? super T>, Request> requests = new HashMap<Subscriber<? super T>, Request>();
    private final Queue<T> unpublished = new LinkedBlockingQueue<T>();

    public PaginatedSourcePublisher(BiFunction<Long, Long, List<T>> function) {
        this(function, 100, 100, 20);
    }

    public PaginatedSourcePublisher(Executor executor, BiFunction<Long, Long, List<T>> function) {
        this(executor, function, 100, 100, 20);
    }

    public PaginatedSourcePublisher(BiFunction<Long, Long, List<T>> function, int min, int max, int fill) {
        this(Executors.newSingleThreadExecutor(), function, min, max, fill);
    }

    public PaginatedSourcePublisher(Executor executor, BiFunction<Long, Long, List<T>> function, int min, int max, int fill) {
        this.executor = new SerialExecutor(executor);
        this.function = function;
        if (min < 1) {
            throw new IllegalArgumentException("Minimum chunk size must be at least one!");
        }
        this.min = min;
        if (max < 1) {
            throw new IllegalArgumentException("Maximum chunk size must be at least one!");
        }
        this.max = max;
        if (fill < 0) {
            throw new IllegalArgumentException("Minimum queue length must be at least zero!");
        }
        if (fill > max / 2) {
            throw new IllegalArgumentException("Minimum queue length can be at most the half of the maximum chunk size!");
        }
        this.fill = fill;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(Subscriber<? super T> subscriber) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}: subscription from {}", (Object)this, (Object)Integer.toHexString(subscriber.hashCode()));
        }
        PaginatedSourceSubscription subscription = new PaginatedSourceSubscription(this, subscriber);
        try {
            Map<Subscriber<? super T>, Request> map = this.requests;
            synchronized (map) {
                this.requests.put(subscriber, new Request());
            }
            subscriber.onSubscribe((Subscription)subscription);
            this.publish();
        }
        catch (NullPointerException e) {
            throw e;
        }
        catch (Exception e) {
            LOG.debug("{}: unexpected error {}", (Object)this, (Object)e);
            this.error = e;
            this.publish();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}: subscription from {} done!", (Object)this, (Object)Integer.toHexString(subscriber.hashCode()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void unsubscribe(Subscriber subscriber) {
        Map<Subscriber<? super T>, Request> map = this.requests;
        synchronized (map) {
            this.requests.remove(subscriber);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isSubscribed(Subscriber subscriber) {
        Map<Subscriber<? super T>, Request> map = this.requests;
        synchronized (map) {
            return this.requests.containsKey(subscriber);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addRequest(Subscriber subscriber, long request) {
        Map<Subscriber<? super T>, Request> map = this.requests;
        synchronized (map) {
            this.requests.get(subscriber).add(request);
        }
    }

    public boolean isCompleted() {
        return this.completed && this.unpublished.isEmpty();
    }

    public boolean hasError() {
        return this.error != null;
    }

    protected void signalError(Throwable error) {
        this.error = error;
        this.publish();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected synchronized void publish() {
        long limit;
        long requested;
        Map<Subscriber<? super T>, Request> map;
        if (this.error != null) {
            Map<Subscriber<? super T>, Request> map2 = this.requests;
            synchronized (map2) {
                this.requests.keySet().forEach(subscriber -> subscriber.onError(this.error));
                this.requests.clear();
            }
            return;
        }
        if (this.publishing) {
            return;
        }
        this.publishing = true;
        int published = Integer.MAX_VALUE;
        while (published > 0 && !this.unpublished.isEmpty()) {
            published = 0;
            map = this.requests;
            synchronized (map) {
                for (Map.Entry<Subscriber<T>, Request> entry : this.requests.entrySet()) {
                    if (this.unpublished.isEmpty()) {
                        LOG.debug("{}: no more itemes available for publication", (Object)this);
                        break;
                    }
                    if (!entry.getValue().consume()) {
                        if (!LOG.isDebugEnabled()) continue;
                        LOG.debug("{}: request from subscriber {} is satisfied!", (Object)this, (Object)Integer.toHexString(entry.getKey().hashCode()));
                        continue;
                    }
                    T item = this.unpublished.remove();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{}: calling {}.onNext({}), remaining requested items for this subscriber: {}, available: {}", new Object[]{this, Integer.toHexString(entry.getKey().hashCode()), item, entry.getValue(), this.unpublished.size()});
                    }
                    ++this.current;
                    ++published;
                    try {
                        entry.getKey().onNext(item);
                    }
                    catch (Throwable t) {
                        LOG.error("unallowed error while signalling onNext({}) to subscriber {}", item, (Object)Integer.toHexString(entry.getKey().hashCode()));
                        this.signalError(t);
                    }
                }
            }
        }
        this.publishing = false;
        if (!this.unpublished.isEmpty()) {
            return;
        }
        if (this.completed) {
            map = this.requests;
            synchronized (map) {
                this.requests.keySet().forEach(subscriber -> subscriber.onComplete());
                this.requests.clear();
            }
            return;
        }
        Map<Subscriber<? super T>, Request> map2 = this.requests;
        synchronized (map2) {
            requested = this.requests.values().stream().map(request -> request.get()).reduce(0L, (acc, val) -> {
                long sum = acc + val;
                return sum > (long)this.max || sum <= 0L && val > 0L ? (long)this.max : sum;
            });
        }
        if (requested == 0L) {
            return;
        }
        final long l = this.start;
        long l2 = limit = requested < (long)this.min ? (long)this.min : requested;
        if (this.current + requested < this.start - (long)this.fill) {
            LOG.debug("{}: no need to request more items (published={}, requested={}, fetched={}, minfill={})", new Object[]{this, this.current, requested, l, this.fill});
            return;
        }
        LOG.debug("{}: scheduling new fetch {} -> {} (published={}, requested={}, fetched={}, minfill={})", new Object[]{this, l + 1L, l + limit, this.current, requested, l, this.fill});
        this.start += limit;
        this.executor.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    LOG.debug("{}: fetching {} -> {}", new Object[]{PaginatedSourcePublisher.this, l + 1L, l + limit});
                    List found = (List)PaginatedSourcePublisher.this.function.apply(l, limit);
                    LOG.debug("{}: fetched {}", (Object)PaginatedSourcePublisher.this, (Object)found.size());
                    if ((long)found.size() < limit) {
                        LOG.debug("{}: DONE (no more events available)!", (Object)PaginatedSourcePublisher.this);
                        PaginatedSourcePublisher.this.completed = true;
                    }
                    PaginatedSourcePublisher.this.unpublished.addAll(found);
                }
                catch (Exception e) {
                    PaginatedSourcePublisher.this.error = e;
                }
                finally {
                    PaginatedSourcePublisher.this.publish();
                }
            }
        });
    }

    public String toString() {
        return Integer.toHexString(this.hashCode());
    }
}

