package org.apache.tajo.storage;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.rawfile.DirectRawFileWriter;
import org.apache.tajo.tuple.memory.MemoryRowBlock;
import org.apache.tajo.util.Pair;

/* loaded from: input_file:org/apache/tajo/storage/HashShuffleAppenderManager.class */
public class HashShuffleAppenderManager {
    private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class);
    private TajoConf systemConf;
    private FileSystem defaultFS;
    private FileSystem localFS;
    private int pageSize;
    private ConcurrentMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap = Maps.newConcurrentMap();
    private ConcurrentMap<Integer, ExecutorService> executors = Maps.newConcurrentMap();
    private List<String> temporalPaths = Lists.newArrayList();
    private LocalDirAllocator lDirAllocator = new LocalDirAllocator(TajoConf.ConfVars.WORKER_TEMPORAL_DIR.varname);

    /* loaded from: input_file:org/apache/tajo/storage/HashShuffleAppenderManager$HashShuffleIntermediate.class */
    public static class HashShuffleIntermediate {
        private int partId;
        private long volume;
        private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes;
        private List<Pair<Long, Integer>> pages;

        public HashShuffleIntermediate(int i, long j, List<Pair<Long, Integer>> list, Collection<Pair<Long, Pair<Integer, Integer>>> collection) {
            this.pages = Lists.newArrayList();
            this.partId = i;
            this.volume = j;
            this.failureTskTupleIndexes = collection;
            this.pages = list;
        }

        public int getPartId() {
            return this.partId;
        }

        public long getVolume() {
            return this.volume;
        }

        public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes() {
            return this.failureTskTupleIndexes;
        }

        public List<Pair<Long, Integer>> getPages() {
            return this.pages;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tajo/storage/HashShuffleAppenderManager$PartitionAppenderMeta.class */
    public static class PartitionAppenderMeta {
        int partId;
        HashShuffleAppenderWrapper appender;
        Path dataFile;

        PartitionAppenderMeta() {
        }

        public int getPartId() {
            return this.partId;
        }

        public HashShuffleAppenderWrapper getAppender() {
            return this.appender;
        }

        public Path getDataFile() {
            return this.dataFile;
        }
    }

    public HashShuffleAppenderManager(TajoConf tajoConf) throws IOException {
        this.systemConf = tajoConf;
        this.defaultFS = TajoConf.getTajoRootDir(tajoConf).getFileSystem(tajoConf);
        this.localFS = FileSystem.getLocal(tajoConf);
        this.pageSize = tajoConf.getIntVar(TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1048576;
        Iterator it = this.lDirAllocator.getAllLocalPathsToRead(".", tajoConf).iterator();
        while (it.hasNext()) {
            this.temporalPaths.add(this.localFS.makeQualified((Path) it.next()).toString());
            this.executors.put(Integer.valueOf(this.temporalPaths.size() - 1), Executors.newSingleThreadExecutor());
        }
    }

    protected int getVolumeId(Path path) {
        int i = 0;
        Iterator<String> it = this.temporalPaths.iterator();
        while (it.hasNext()) {
            if (path.toString().startsWith(it.next())) {
                break;
            }
            i++;
        }
        Preconditions.checkPositionIndex(i, this.temporalPaths.size() - 1);
        return i;
    }

    public synchronized HashShuffleAppenderWrapper getAppender(MemoryRowBlock memoryRowBlock, ExecutionBlockId executionBlockId, int i, TableMeta tableMeta, Schema schema) throws IOException {
        Map<Integer, PartitionAppenderMeta> map = this.appenderMap.get(executionBlockId);
        if (map == null) {
            map = Maps.newConcurrentMap();
            this.appenderMap.put(executionBlockId, map);
        }
        PartitionAppenderMeta partitionAppenderMeta = map.get(Integer.valueOf(i));
        if (partitionAppenderMeta == null) {
            Path dataFile = getDataFile(executionBlockId, i);
            FileSystem fileSystem = dataFile.getFileSystem(this.systemConf);
            if (fileSystem.exists(dataFile)) {
                LOG.info("File " + dataFile + " already exists, size=" + fileSystem.getFileStatus(dataFile).getLen());
            }
            if (!fileSystem.exists(dataFile.getParent())) {
                fileSystem.mkdirs(dataFile.getParent());
            }
            DirectRawFileWriter directRawFileWriter = new DirectRawFileWriter(this.systemConf, null, schema, tableMeta, dataFile, memoryRowBlock);
            directRawFileWriter.enableStats();
            directRawFileWriter.init();
            partitionAppenderMeta = new PartitionAppenderMeta();
            partitionAppenderMeta.partId = i;
            partitionAppenderMeta.dataFile = dataFile;
            partitionAppenderMeta.appender = new HashShuffleAppenderWrapper(executionBlockId, i, this.pageSize, directRawFileWriter, getVolumeId(dataFile));
            partitionAppenderMeta.appender.init();
            map.put(Integer.valueOf(i), partitionAppenderMeta);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Create Hash shuffle file(partId=" + i + "): " + dataFile);
            }
        }
        return partitionAppenderMeta.appender;
    }

    public static int getPartParentId(int i, TajoConf tajoConf) {
        return i % tajoConf.getIntVar(TajoConf.ConfVars.SHUFFLE_HASH_PARENT_DIRS);
    }

    private Path getDataFile(ExecutionBlockId executionBlockId, int i) throws IOException {
        try {
            return this.localFS.makeQualified(StorageUtil.concatPath(this.lDirAllocator.getLocalPathForWrite(executionBlockId.getQueryId().toString() + "/output/" + executionBlockId.getId() + "/hash-shuffle", this.systemConf), new String[]{"" + getPartParentId(i, this.systemConf), "" + i}));
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            throw new IOException(e);
        }
    }

    public List<HashShuffleIntermediate> close(ExecutionBlockId executionBlockId) throws IOException {
        Map<Integer, PartitionAppenderMeta> remove = this.appenderMap.remove(executionBlockId);
        if (remove == null) {
            LOG.info("Close HashShuffleAppenderWrapper:" + executionBlockId + ", not a hash shuffle");
            return null;
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (PartitionAppenderMeta partitionAppenderMeta : remove.values()) {
            try {
                partitionAppenderMeta.appender.close();
                newArrayList.add(new HashShuffleIntermediate(partitionAppenderMeta.partId, partitionAppenderMeta.appender.getOffset(), partitionAppenderMeta.appender.getPages(), partitionAppenderMeta.appender.getMergedTupleIndexes()));
            } catch (IOException e) {
                LOG.error(e.getMessage(), e);
                throw e;
            }
        }
        LOG.info("Close HashShuffleAppenderWrapper:" + executionBlockId + ", intermediates=" + newArrayList.size());
        return newArrayList;
    }

    public void finalizeTask(TaskAttemptId taskAttemptId) {
        Map<Integer, PartitionAppenderMeta> map = this.appenderMap.get(taskAttemptId.getTaskId().getExecutionBlockId());
        if (map == null) {
            return;
        }
        Iterator<PartitionAppenderMeta> it = map.values().iterator();
        while (it.hasNext()) {
            it.next().appender.taskFinished(taskAttemptId);
        }
    }

    public Future<MemoryRowBlock> writePartitions(TableMeta tableMeta, Schema schema, final TaskAttemptId taskAttemptId, int i, final MemoryRowBlock memoryRowBlock, final boolean z) throws IOException {
        final HashShuffleAppenderWrapper appender = getAppender(memoryRowBlock, taskAttemptId.getTaskId().getExecutionBlockId(), i, tableMeta, schema);
        return this.executors.get(Integer.valueOf(appender.getVolumeId())).submit(new Callable<MemoryRowBlock>() { // from class: org.apache.tajo.storage.HashShuffleAppenderManager.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public MemoryRowBlock call() throws Exception {
                appender.writeRowBlock(taskAttemptId, memoryRowBlock);
                if (z) {
                    memoryRowBlock.release();
                } else {
                    memoryRowBlock.clear();
                }
                return memoryRowBlock;
            }
        });
    }

    public void shutdown() {
        Iterator<ExecutorService> it = this.executors.values().iterator();
        while (it.hasNext()) {
            it.next().shutdownNow();
        }
    }
}
