/*
 * Decompiled with CFR 0.152.
 */
package de.unkrig.commons.util;

import de.unkrig.commons.lang.protocol.Consumer;
import de.unkrig.commons.lang.protocol.ConsumerWhichThrows;
import de.unkrig.commons.lang.protocol.Producer;
import de.unkrig.commons.lang.protocol.RunnableWhichThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

public class WyeConsumer<T, EX extends Exception>
implements RunnableWhichThrows<EX> {
    private final ConsumerWhichThrows<? super T, EX> target;
    private final List<Producer<? extends T>> producers = new ArrayList<Producer<? extends T>>();

    public WyeConsumer(ConsumerWhichThrows<? super T, EX> target) {
        this.target = target;
    }

    public Consumer<T> newConsumer(int capacity) {
        final LinkedBlockingQueue queue = new LinkedBlockingQueue(capacity);
        this.producers.add(new Producer<T>(){

            @Override
            public T produce() {
                while (true) {
                    try {
                        return queue.take();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        continue;
                    }
                    break;
                }
            }
        });
        return new Consumer<T>(){

            @Override
            public void consume(T subject) {
                queue.add(subject);
            }
        };
    }

    @Override
    public void run() throws EX {
        for (Producer<T> producer : this.producers) {
            Object subject;
            while ((subject = producer.produce()) != null) {
                this.target.consume(subject);
            }
        }
    }
}

