/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.tools.util;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.tools.util.ProducerConsumer;
import org.apache.hadoop.tools.util.WorkReport;
import org.apache.hadoop.tools.util.WorkRequest;
import org.apache.hadoop.tools.util.WorkRequestProcessor;

public class ProducerConsumer<T, R> {
    private Log LOG = LogFactory.getLog(ProducerConsumer.class);
    private LinkedBlockingQueue<WorkRequest<T>> inputQueue = new LinkedBlockingQueue();
    private LinkedBlockingQueue<WorkReport<R>> outputQueue = new LinkedBlockingQueue();
    private ExecutorService executor;
    private AtomicInteger workCnt;

    public ProducerConsumer(int numThreads) {
        this.executor = Executors.newFixedThreadPool(numThreads);
        this.workCnt = new AtomicInteger(0);
    }

    public void addWorker(WorkRequestProcessor<T, R> processor) {
        this.executor.execute((Runnable)new Worker(this, processor));
    }

    public void shutdown() {
        if (this.hasWork()) {
            this.LOG.warn((Object)"Shutdown() is called but there are still unprocessed work!");
        }
        this.executor.shutdownNow();
    }

    public int getWorkCnt() {
        return this.workCnt.get();
    }

    public boolean hasWork() {
        return this.workCnt.get() > 0;
    }

    public void put(WorkRequest<T> workRequest) {
        boolean isDone = false;
        while (!isDone) {
            try {
                this.inputQueue.put(workRequest);
                this.workCnt.incrementAndGet();
                isDone = true;
            }
            catch (InterruptedException ie) {
                this.LOG.error((Object)"Could not put workRequest into inputQueue. Retrying...");
            }
        }
    }

    public WorkReport<R> take() throws InterruptedException {
        WorkReport report = (WorkReport)this.outputQueue.take();
        this.workCnt.decrementAndGet();
        return report;
    }

    public WorkReport<R> blockingTake() {
        while (true) {
            try {
                WorkReport report = (WorkReport)this.outputQueue.take();
                this.workCnt.decrementAndGet();
                return report;
            }
            catch (InterruptedException ie) {
                this.LOG.debug((Object)"Retrying in blockingTake...");
                continue;
            }
            break;
        }
    }

    static /* synthetic */ LinkedBlockingQueue access$000(ProducerConsumer x0) {
        return x0.inputQueue;
    }

    static /* synthetic */ Log access$100(ProducerConsumer x0) {
        return x0.LOG;
    }

    static /* synthetic */ LinkedBlockingQueue access$200(ProducerConsumer x0) {
        return x0.outputQueue;
    }
}

