package org.apache.giraph.block_app.framework.output;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.giraph.block_app.framework.api.BlockOutputApi;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Progressable;

/* loaded from: input_file:org/apache/giraph/block_app/framework/output/BlockOutputHandle.class */
public class BlockOutputHandle implements BlockOutputApi {
    private transient Configuration conf;
    private transient Progressable progressable;
    private final Map<String, BlockOutputDesc> outputDescMap;
    private final Map<String, Queue<BlockOutputWriter>> freeWriters;
    private final Map<String, Queue<BlockOutputWriter>> occupiedWriters;

    public BlockOutputHandle() {
        this.freeWriters = new HashMap();
        this.occupiedWriters = new HashMap();
        this.outputDescMap = null;
    }

    public BlockOutputHandle(String str, Configuration configuration, Progressable progressable) {
        this.freeWriters = new HashMap();
        this.occupiedWriters = new HashMap();
        this.outputDescMap = BlockOutputFormat.createInitAndCheckOutputDescsMap(configuration, str);
        for (String str2 : this.outputDescMap.keySet()) {
            this.freeWriters.put(str2, new ConcurrentLinkedQueue());
            this.occupiedWriters.put(str2, new ConcurrentLinkedQueue());
        }
        initialize(configuration, progressable);
    }

    public void initialize(Configuration configuration, Progressable progressable) {
        this.conf = configuration;
        this.progressable = progressable;
    }

    @Override // org.apache.giraph.block_app.framework.api.BlockOutputApi
    public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>> OD getOutputDesc(String str) {
        if (this.outputDescMap == null) {
            throw new IllegalArgumentException("Output cannot be used with checkpointing");
        }
        return (OD) this.outputDescMap.get(str);
    }

    @Override // org.apache.giraph.block_app.framework.api.BlockOutputApi
    public <OW extends BlockOutputWriter> OW getWriter(String str) {
        if (this.outputDescMap == null) {
            throw new IllegalArgumentException("Output cannot be used with checkpointing");
        }
        BlockOutputWriter poll = this.freeWriters.get(str).poll();
        if (poll == null) {
            poll = this.outputDescMap.get(str).createOutputWriter(this.conf, this.progressable);
        }
        this.occupiedWriters.get(str).add(poll);
        return (OW) poll;
    }

    public void returnAllWriters() {
        for (Map.Entry<String, Queue<BlockOutputWriter>> entry : this.occupiedWriters.entrySet()) {
            this.freeWriters.get(entry.getKey()).addAll(entry.getValue());
            entry.getValue().clear();
        }
    }

    public void closeAllWriters() {
        final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        Iterator<Queue<BlockOutputWriter>> it = this.freeWriters.values().iterator();
        while (it.hasNext()) {
            concurrentLinkedQueue.addAll(it.next());
        }
        if (concurrentLinkedQueue.isEmpty()) {
            return;
        }
        ProgressableUtils.getResultsWithNCallables(new CallableFactory<Void>() { // from class: org.apache.giraph.block_app.framework.output.BlockOutputHandle.1
            public Callable<Void> newCallable(int i) {
                return new Callable<Void>() { // from class: org.apache.giraph.block_app.framework.output.BlockOutputHandle.1.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        Object poll = concurrentLinkedQueue.poll();
                        while (true) {
                            BlockOutputWriter blockOutputWriter = (BlockOutputWriter) poll;
                            if (blockOutputWriter == null) {
                                return null;
                            }
                            blockOutputWriter.close();
                            poll = concurrentLinkedQueue.poll();
                        }
                    }
                };
            }
        }, Math.min(GiraphConstants.NUM_OUTPUT_THREADS.get(this.conf), concurrentLinkedQueue.size()), "close-writers-%d", this.progressable);
    }
}
