/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.services.rlserver.mongodb;

import com.mongodb.MongoTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public final class SubscriberHelpers {
    private SubscriberHelpers() {
    }

    public static class PrintDocumentSubscriber
    extends OperationSubscriber<Document> {
        @Override
        public void onNext(Document document) {
            super.onNext(document);
            System.out.println(document.toJson());
        }
    }

    public static class PrintSubscriber<T>
    extends OperationSubscriber<T> {
        private final String message;

        public PrintSubscriber(String message) {
            this.message = message;
        }

        @Override
        public void onComplete() {
            System.out.println(String.format(this.message, this.getReceived()));
            super.onComplete();
        }
    }

    public static class OperationSubscriber<T>
    extends ObservableSubscriber<T> {
        @Override
        public void onSubscribe(Subscription s) {
            super.onSubscribe(s);
            s.request(Integer.MAX_VALUE);
        }
    }

    public static class ObservableSubscriber<T>
    implements Subscriber<T> {
        private final List<T> received = new ArrayList<T>();
        private final List<Throwable> errors = new ArrayList<Throwable>();
        private final CountDownLatch latch = new CountDownLatch(1);
        private volatile Subscription subscription;
        private volatile boolean completed;

        ObservableSubscriber() {
        }

        public void onSubscribe(Subscription s) {
            this.subscription = s;
        }

        public void onNext(T t) {
            this.received.add(t);
        }

        public void onError(Throwable t) {
            this.errors.add(t);
            this.onComplete();
        }

        public void onComplete() {
            this.completed = true;
            this.latch.countDown();
        }

        public Subscription getSubscription() {
            return this.subscription;
        }

        public List<T> getReceived() {
            return this.received;
        }

        public Throwable getError() {
            if (this.errors.size() > 0) {
                return this.errors.get(0);
            }
            return null;
        }

        public boolean isCompleted() {
            return this.completed;
        }

        public List<T> get(long timeout, TimeUnit unit) throws Throwable {
            return this.await(timeout, unit).getReceived();
        }

        public ObservableSubscriber<T> await() throws Throwable {
            return this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        }

        public ObservableSubscriber<T> await(long timeout, TimeUnit unit) throws Throwable {
            this.subscription.request(Integer.MAX_VALUE);
            if (!this.latch.await(timeout, unit)) {
                throw new MongoTimeoutException("Publisher onComplete timed out");
            }
            if (!this.errors.isEmpty()) {
                throw this.errors.get(0);
            }
            return this;
        }
    }
}

