package org.apache.tajo.querymaster;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigInteger;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.SessionVars;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.UniformRangePartition;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.global.rewriter.rules.GlobalPlanRewriteUtil;
import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.exception.UndefinedTableException;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.logical.DistinctGroupbyNode;
import org.apache.tajo.plan.logical.GroupbyNode;
import org.apache.tajo.plan.logical.JoinNode;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.PartitionedTableScanNode;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.plan.logical.SortNode;
import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.querymaster.Task;
import org.apache.tajo.storage.FileTablespace;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.worker.FetchImpl;

/* loaded from: input_file:org/apache/tajo/querymaster/Repartitioner.class */
public class Repartitioner {
    private static final Log LOG = LogFactory.getLog(Repartitioner.class);
    private static final int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
    private static final String UNKNOWN_HOST = "unknown";

    @VisibleForTesting
    /* loaded from: input_file:org/apache/tajo/querymaster/Repartitioner$FetchGroupMeta.class */
    public static class FetchGroupMeta {
        long totalVolume;
        List<FetchImpl> fetchUrls;

        public FetchGroupMeta(long j, FetchImpl fetchImpl) {
            this.totalVolume = j;
            this.fetchUrls = Lists.newArrayList(new FetchImpl[]{fetchImpl});
        }

        public FetchGroupMeta addFetche(FetchImpl fetchImpl) {
            this.fetchUrls.add(fetchImpl);
            return this;
        }

        public void increaseVolume(long j) {
            this.totalVolume += j;
        }

        public long getVolume() {
            return this.totalVolume;
        }
    }

    public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext taskSchedulerContext, Stage stage) throws IOException, TajoException {
        ExecutionBlock block = stage.getBlock();
        QueryMasterTask.QueryMasterTaskContext context = stage.getContext();
        ScanNode[] scanNodes = block.getScanNodes();
        Fragment[] fragmentArr = new Fragment[scanNodes.length];
        long[] jArr = new long[scanNodes.length];
        for (int i = 0; i < scanNodes.length; i++) {
            TableDesc tableDesc = context.getTableDesc(scanNodes[i]);
            if (tableDesc == null) {
                if (block.getUnionScanMap() == null || block.getUnionScanMap().isEmpty()) {
                    jArr[i] = context.getStage(TajoIdUtils.createExecutionBlockId(scanNodes[i].getTableName())).getResultStats().getNumBytes().longValue();
                } else {
                    Iterator<Map.Entry<ExecutionBlockId, ExecutionBlockId>> it = block.getUnionScanMap().entrySet().iterator();
                    while (it.hasNext()) {
                        int i2 = i;
                        jArr[i2] = jArr[i2] + context.getStage(it.next().getKey()).getResultStats().getNumBytes().longValue();
                    }
                }
                fragmentArr[i] = new FileFragment(scanNodes[i].getCanonicalName(), new Path("/dummy"), 0L, 0L, new String[]{UNKNOWN_HOST});
            } else {
                jArr[i] = GlobalPlanRewriteUtil.computeDescendentVolume(scanNodes[i]);
                List splits = TablespaceManager.get(tableDesc.getUri()).getSplits(scanNodes[i].getCanonicalName(), tableDesc, (EvalNode) null);
                if (splits.size() > 0) {
                    fragmentArr[i] = (Fragment) splits.get(0);
                } else {
                    fragmentArr[i] = new FileFragment(scanNodes[i].getCanonicalName(), new Path(tableDesc.getUri()), 0L, 0L, new String[]{UNKNOWN_HOST});
                }
            }
        }
        JoinNode findMostBottomNode = PlannerUtil.findMostBottomNode(block.getPlan(), NodeType.JOIN);
        if (findMostBottomNode != null && findMostBottomNode.getJoinType() == JoinType.INNER) {
            LogicalNode leftChild = findMostBottomNode.getLeftChild();
            LogicalNode rightChild = findMostBottomNode.getRightChild();
            for (int i3 = 0; i3 < jArr.length; i3++) {
                if ((scanNodes[i3].getPID() == leftChild.getPID() || scanNodes[i3].getPID() == rightChild.getPID()) && jArr[i3] == 0) {
                    LOG.info(scanNodes[i3] + " 's input data is zero. Inner join's result is empty.");
                    return;
                }
            }
        }
        JoinNode findTopNode = PlannerUtil.findTopNode(block.getPlan(), NodeType.JOIN);
        if (findTopNode != null) {
            boolean z = true;
            int i4 = 0;
            while (true) {
                if (i4 >= jArr.length) {
                    break;
                }
                if (jArr[i4] > 0) {
                    z = false;
                    break;
                }
                i4++;
            }
            if (z) {
                LOG.info("All input join tables are empty.");
                return;
            }
            ScanNode findTopNode2 = PlannerUtil.findTopNode(findTopNode.getLeftChild(), NodeType.SCAN);
            ScanNode findTopNode3 = PlannerUtil.findTopNode(findTopNode.getRightChild(), NodeType.SCAN);
            long j = -1;
            long j2 = -1;
            if (jArr.length == 2) {
                for (int i5 = 0; i5 < jArr.length; i5++) {
                    if (scanNodes[i5].equals(findTopNode2)) {
                        j = jArr[i5];
                    } else if (scanNodes[i5].equals(findTopNode3)) {
                        j2 = jArr[i5];
                    }
                }
                if (findTopNode.getJoinType() == JoinType.LEFT_OUTER && j == 0) {
                    return;
                }
                if (findTopNode.getJoinType() == JoinType.RIGHT_OUTER && j2 == 0) {
                    return;
                }
            }
        }
        if (!block.hasBroadcastRelation()) {
            LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join");
            scheduleSymmetricRepartitionJoin(context, taskSchedulerContext, stage, jArr, fragmentArr, null, null, null);
            return;
        }
        boolean z2 = false;
        ArrayList<Integer> arrayList = new ArrayList();
        ArrayList<Integer> arrayList2 = new ArrayList();
        String str = "";
        long j3 = Long.MIN_VALUE;
        int i6 = -1;
        StringBuilder sb = new StringBuilder();
        for (int i7 = 0; i7 < scanNodes.length; i7++) {
            if (scanNodes[i7].getTableDesc().getMeta().getDataFormat().equalsIgnoreCase("RAW")) {
                z2 = true;
                arrayList.add(Integer.valueOf(i7));
                sb.append(str).append(scanNodes[i7].getCanonicalName());
                str = ",";
            }
            if (block.isBroadcastRelation(scanNodes[i7])) {
                arrayList2.add(Integer.valueOf(i7));
            } else if (jArr[i7] > 0 && jArr[i7] > j3) {
                j3 = jArr[i7];
                i6 = i7;
            }
        }
        String sb2 = sb.toString();
        if (i6 == -1) {
            i6 = 0;
        }
        if (!z2) {
            if (arrayList.size() <= 1) {
                int intValue = arrayList.isEmpty() ? i6 : ((Integer) arrayList.get(0)).intValue();
                LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, base_table=%s, base_volume=%d", scanNodes[intValue].getCanonicalName(), Long.valueOf(jArr[intValue])));
                scheduleLeafTasksWithBroadcastTable(taskSchedulerContext, stage, intValue, fragmentArr);
                return;
            } else {
                StringBuilder sb3 = new StringBuilder();
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    sb3.append(scanNodes[((Integer) it2.next()).intValue()].getTableName()).append(',');
                }
                throw new IOException("Broadcast join with leaf node should have only one large table, but " + arrayList.size() + ", tables=" + sb3.toString());
            }
        }
        if (arrayList.size() > 2) {
            throw new IOException("Symmetric Repartition Join should have two scan node, but " + sb2);
        }
        long[] jArr2 = new long[arrayList.size()];
        Fragment[] fragmentArr2 = new Fragment[arrayList.size()];
        int i8 = 0;
        for (Integer num : arrayList) {
            jArr2[i8] = jArr[num.intValue()];
            int i9 = i8;
            i8++;
            fragmentArr2[i9] = fragmentArr[num.intValue()];
        }
        Fragment[] fragmentArr3 = new Fragment[arrayList2.size()];
        ScanNode[] scanNodeArr = new ScanNode[arrayList2.size()];
        long[] jArr3 = new long[arrayList2.size()];
        int i10 = 0;
        for (Integer num2 : arrayList2) {
            scanNodes[num2.intValue()].setBroadcastTable(true);
            scanNodeArr[i10] = scanNodes[num2.intValue()];
            jArr3[i10] = jArr[num2.intValue()];
            fragmentArr3[i10] = fragmentArr[num2.intValue()];
            i10++;
        }
        LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, join_node=%s", sb2));
        scheduleSymmetricRepartitionJoin(context, taskSchedulerContext, stage, jArr2, fragmentArr2, scanNodeArr, jArr3, fragmentArr3);
    }

    private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMasterTaskContext queryMasterTaskContext, TaskSchedulerContext taskSchedulerContext, Stage stage, long[] jArr, Fragment[] fragmentArr, ScanNode[] scanNodeArr, long[] jArr2, Fragment[] fragmentArr2) throws IOException, TajoException {
        MasterPlan masterPlan = stage.getMasterPlan();
        ExecutionBlock block = stage.getBlock();
        HashMap hashMap = new HashMap();
        List<ExecutionBlock> childs = masterPlan.getChilds(stage.getId());
        Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = block.getUnionScanMap();
        for (ExecutionBlock executionBlock : childs) {
            ExecutionBlockId executionBlockId = unionScanMap.get(executionBlock.getId());
            if (executionBlockId == null) {
                executionBlockId = executionBlock.getId();
            }
            Stage stage2 = stage.getContext().getStage(executionBlock.getId());
            if (stage2.getHashShuffleIntermediateEntries() != null && !stage2.getHashShuffleIntermediateEntries().isEmpty()) {
                for (Task.IntermediateEntry intermediateEntry : stage2.getHashShuffleIntermediateEntries()) {
                    intermediateEntry.setEbId(executionBlock.getId());
                    if (hashMap.containsKey(Integer.valueOf(intermediateEntry.getPartId()))) {
                        Map map = (Map) hashMap.get(Integer.valueOf(intermediateEntry.getPartId()));
                        if (map.containsKey(executionBlockId)) {
                            ((List) map.get(executionBlockId)).add(intermediateEntry);
                        } else {
                            map.put(executionBlockId, TUtil.newList(new Task.IntermediateEntry[]{intermediateEntry}));
                        }
                    } else {
                        HashMap hashMap2 = new HashMap();
                        hashMap2.put(executionBlockId, TUtil.newList(new Task.IntermediateEntry[]{intermediateEntry}));
                        hashMap.put(Integer.valueOf(intermediateEntry.getPartId()), hashMap2);
                    }
                }
            } else if (hashMap.containsKey(0)) {
                Map map2 = (Map) hashMap.get(0);
                if (map2.containsKey(executionBlockId)) {
                    ((List) map2.get(executionBlockId)).addAll(new ArrayList());
                } else {
                    map2.put(executionBlockId, new ArrayList());
                }
            } else {
                HashMap hashMap3 = new HashMap();
                hashMap3.put(executionBlockId, new ArrayList());
                hashMap.put(0, hashMap3);
            }
        }
        long j = jArr[0];
        long j2 = jArr.length == 2 ? jArr[1] : jArr2[0];
        int[] iArr = new int[2];
        iArr[0] = hashMap.size() == 0 ? 0 : (int) (j / hashMap.size());
        iArr[1] = hashMap.size() == 0 ? 0 : (int) (jArr.length == 2 ? j2 / hashMap.size() : j2);
        int i = iArr[0] + iArr[1];
        long j3 = j >= j2 ? j : j2;
        int i2 = stage.getMasterPlan().getContext().getInt(SessionVars.JOIN_TASK_INPUT_SIZE);
        int ceil = (int) Math.ceil(j3 / 1048576.0d);
        LOG.info("Larger intermediate data is approximately " + ceil + " MB");
        int ceil2 = (int) Math.ceil(ceil / i2);
        LOG.info("The calculated number of tasks is " + ceil2);
        LOG.info("The number of total shuffle keys is " + hashMap.size());
        int min = Math.min(ceil2, hashMap.size());
        LOG.info("The determined number of join tasks is " + min);
        ArrayList arrayList = new ArrayList();
        if (fragmentArr.length == 2) {
            arrayList.add(fragmentArr[1]);
        }
        if (fragmentArr2 != null) {
            for (ScanNode scanNode : scanNodeArr) {
                TableDesc tableDesc = queryMasterTaskContext.getTableDesc(scanNode);
                FileTablespace fileTablespace = TablespaceManager.get(tableDesc.getUri());
                if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
                    PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
                    Path[] inputPaths = partitionedTableScanNode.getInputPaths();
                    getFragmentsFromPartitionedTable(fileTablespace, scanNode, tableDesc);
                    partitionedTableScanNode.setInputPaths(inputPaths);
                } else {
                    List splits = fileTablespace.getSplits(scanNode.getCanonicalName(), tableDesc, scanNode.getQual());
                    if (splits != null) {
                        arrayList.addAll(splits);
                    }
                }
            }
        }
        Stage.scheduleFragment(stage, fragmentArr[0], arrayList);
        for (Map.Entry entry : hashMap.entrySet()) {
            addJoinShuffle(stage, ((Integer) entry.getKey()).intValue(), (Map) entry.getValue());
        }
        taskSchedulerContext.setTaskSize((int) Math.ceil(i / min));
        taskSchedulerContext.setEstimatedTaskNum(min);
    }

    public static Map<Integer, Map<ExecutionBlockId, List<Task.IntermediateEntry>>> mergeIntermediateByPullHost(Map<Integer, Map<ExecutionBlockId, List<Task.IntermediateEntry>>> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<Integer, Map<ExecutionBlockId, List<Task.IntermediateEntry>>> entry : map.entrySet()) {
            Integer key = entry.getKey();
            for (Map.Entry<ExecutionBlockId, List<Task.IntermediateEntry>> entry2 : entry.getValue().entrySet()) {
                List<Task.IntermediateEntry> value = entry2.getValue();
                if (value != null && !value.isEmpty()) {
                    ExecutionBlockId key2 = entry2.getKey();
                    HashMap hashMap2 = new HashMap();
                    for (Task.IntermediateEntry intermediateEntry : value) {
                        String str = intermediateEntry.getEbId().toString() + intermediateEntry.getPullHost().getPullAddress();
                        Task.IntermediateEntry intermediateEntry2 = (Task.IntermediateEntry) hashMap2.get(str);
                        if (intermediateEntry2 == null) {
                            intermediateEntry2 = new Task.IntermediateEntry(-1, -1, key.intValue(), intermediateEntry.getPullHost());
                            intermediateEntry2.setEbId(intermediateEntry.getEbId());
                            hashMap2.put(str, intermediateEntry2);
                        }
                        intermediateEntry2.setVolume(intermediateEntry2.getVolume() + intermediateEntry.getVolume());
                    }
                    ArrayList arrayList = new ArrayList(hashMap2.values());
                    Map map2 = (Map) hashMap.get(key);
                    if (map2 == null) {
                        map2 = new HashMap();
                        hashMap.put(key, map2);
                    }
                    map2.put(key2, arrayList);
                }
            }
        }
        return hashMap;
    }

    public static List<Fragment> getFragmentsFromPartitionedTable(Tablespace tablespace, ScanNode scanNode, TableDesc tableDesc) throws IOException {
        Preconditions.checkArgument(tablespace instanceof FileTablespace, "tsHandler must be FileTablespace");
        if (!(scanNode instanceof PartitionedTableScanNode)) {
            throw new IllegalArgumentException("scan should be a PartitionedTableScanNode type.");
        }
        ArrayList newArrayList = Lists.newArrayList();
        PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
        newArrayList.addAll(((FileTablespace) tablespace).getSplits(scanNode.getCanonicalName(), tableDesc.getMeta(), tableDesc.getSchema(), partitionedTableScanNode.getInputPaths()));
        partitionedTableScanNode.setInputPaths((Path[]) null);
        return newArrayList;
    }

    private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext taskSchedulerContext, Stage stage, int i, Fragment[] fragmentArr) throws IOException, TajoException {
        List<Fragment> splits;
        ScanNode[] scanNodes = stage.getBlock().getScanNodes();
        for (int i2 = 0; i2 < scanNodes.length; i2++) {
            if (i2 != i) {
                scanNodes[i2].setBroadcastTable(true);
            }
        }
        List<Fragment> list = null;
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < scanNodes.length; i3++) {
            ScanNode scanNode = scanNodes[i3];
            TableDesc tableDesc = stage.getContext().getTableDesc(scanNode);
            Path[] pathArr = null;
            Tablespace tablespace = TablespaceManager.get(tableDesc.getUri());
            if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
                pathArr = ((PartitionedTableScanNode) scanNode).getInputPaths();
                splits = getFragmentsFromPartitionedTable(tablespace, scanNode, tableDesc);
            } else {
                splits = tablespace.getSplits(scanNode.getCanonicalName(), tableDesc, scanNode.getQual());
            }
            if (splits != null) {
                if (i3 == i) {
                    list = splits;
                } else if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
                    ((PartitionedTableScanNode) scanNode).setInputPaths(pathArr);
                } else {
                    arrayList.addAll(splits);
                }
            }
        }
        if (list == null) {
            throw new IOException("No fragments for " + scanNodes[i].getTableName());
        }
        Stage.scheduleFragments(stage, list, arrayList);
        taskSchedulerContext.setEstimatedTaskNum(list.size());
    }

    private static void addJoinShuffle(Stage stage, int i, Map<ExecutionBlockId, List<Task.IntermediateEntry>> map) {
        HashMap hashMap = new HashMap();
        for (ExecutionBlock executionBlock : stage.getMasterPlan().getChilds(stage.getId())) {
            if (map.containsKey(executionBlock.getId())) {
                hashMap.put(executionBlock.getId().toString(), Lists.newArrayList(mergeShuffleRequest(i, PlanProto.ShuffleType.HASH_SHUFFLE, map.get(executionBlock.getId()))));
            }
        }
        if (hashMap.isEmpty()) {
            LOG.info(stage.getId() + "'s " + i + " partition has empty result.");
        } else {
            Stage.scheduleFetches(stage, hashMap);
        }
    }

    private static Collection<FetchImpl> mergeShuffleRequest(int i, PlanProto.ShuffleType shuffleType, List<Task.IntermediateEntry> list) {
        HashMap hashMap = new HashMap();
        for (Task.IntermediateEntry intermediateEntry : list) {
            String str = intermediateEntry.getEbId().toString() + "," + intermediateEntry.getPullHost();
            if (hashMap.containsKey(str)) {
                ((FetchImpl) hashMap.get(str)).addPart(intermediateEntry.getTaskId(), intermediateEntry.getAttemptId());
            } else {
                FetchImpl fetchImpl = new FetchImpl(intermediateEntry.getPullHost(), shuffleType, intermediateEntry.getEbId(), i);
                fetchImpl.addPart(intermediateEntry.getTaskId(), intermediateEntry.getAttemptId());
                hashMap.put(str, fetchImpl);
            }
        }
        return hashMap.values();
    }

    public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext taskSchedulerContext, MasterPlan masterPlan, Stage stage, int i) throws IOException {
        DataChannel dataChannel = masterPlan.getIncomingChannels(stage.getBlock().getId()).get(0);
        if (dataChannel.getShuffleType() == PlanProto.ShuffleType.HASH_SHUFFLE || dataChannel.getShuffleType() == PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE) {
            scheduleHashShuffledFetches(taskSchedulerContext, masterPlan, stage, dataChannel, i);
        } else {
            if (dataChannel.getShuffleType() != PlanProto.ShuffleType.RANGE_SHUFFLE) {
                throw new TajoInternalError("Cannot support partition type");
            }
            scheduleRangeShuffledFetches(taskSchedulerContext, masterPlan, stage, dataChannel, i);
        }
    }

    private static TableStats computeChildBlocksStats(QueryMasterTask.QueryMasterTaskContext queryMasterTaskContext, MasterPlan masterPlan, ExecutionBlockId executionBlockId) {
        ArrayList arrayList = new ArrayList();
        Iterator<ExecutionBlock> it = masterPlan.getChilds(executionBlockId).iterator();
        while (it.hasNext()) {
            arrayList.add(queryMasterTaskContext.getStage(it.next().getId()).getResultStats());
        }
        return StatisticsUtil.aggregateTableStat(arrayList);
    }

    public static void scheduleRangeShuffledFetches(TaskSchedulerContext taskSchedulerContext, MasterPlan masterPlan, Stage stage, DataChannel dataChannel, int i) throws IOException {
        int i2;
        TupleRange[] partition;
        ScanNode scanNode = stage.getBlock().getScanNodes()[0];
        SortNode findTopNode = PlannerUtil.findTopNode(masterPlan.getChild(stage.getId(), 0).getPlan(), NodeType.SORT);
        SortSpec[] sortKeys = findTopNode.getSortKeys();
        Schema schema = new Schema(dataChannel.getShuffleKeys());
        TableStats computeChildBlocksStats = computeChildBlocksStats(stage.getContext(), masterPlan, stage.getId());
        if (computeChildBlocksStats.getNumBytes().longValue() == 0 && computeChildBlocksStats.getColumnStats().size() == 0) {
            return;
        }
        TupleRange columnStatToRange = TupleUtil.columnStatToRange(sortKeys, schema, computeChildBlocksStats.getColumnStats(), false);
        if (findTopNode.getSortPurpose() == SortNode.SortPurpose.STORAGE_SPECIFIED) {
            try {
                partition = ((Tablespace) TablespaceManager.getAnyByScheme(PlannerUtil.getDataFormat(masterPlan.getLogicalPlan())).get()).getInsertSortRanges(stage.getContext().getQueryContext(), PlannerUtil.getTableDesc(stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(), masterPlan.getLogicalPlan().getRootBlock().getRoot().getChild()), findTopNode.getInSchema(), sortKeys, columnStatToRange);
                i2 = partition.length;
            } catch (UndefinedTableException e) {
                throw new IOException("Can't get table meta data from catalog: " + PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan()));
            }
        } else {
            UniformRangePartition uniformRangePartition = new UniformRangePartition(columnStatToRange, sortKeys);
            BigInteger totalCardinality = uniformRangePartition.getTotalCardinality();
            if (totalCardinality.compareTo(BigInteger.valueOf(i)) < 0) {
                LOG.info(stage.getId() + ", The range cardinality (" + totalCardinality + ") is less then the desired number of tasks (" + i + ")");
                i2 = totalCardinality.intValue();
            } else {
                i2 = i;
            }
            LOG.info(stage.getId() + ", Try to divide " + columnStatToRange + " into " + i2 + " sub ranges (total units: " + i2 + ")");
            partition = uniformRangePartition.partition(i2);
            if (partition == null) {
                throw new NullPointerException("ranges is null on " + stage.getId() + " stage.");
            }
            if (partition.length == 0) {
                LOG.warn(stage.getId() + " no range infos.");
            }
            TupleUtil.setMaxRangeIfNull(sortKeys, schema, computeChildBlocksStats.getColumnStats(), partition);
            if (LOG.isDebugEnabled() && partition != null) {
                for (TupleRange tupleRange : partition) {
                    LOG.debug(stage.getId() + " range: " + tupleRange.getStart() + " ~ " + tupleRange.getEnd());
                }
            }
        }
        Stage.scheduleFragment(stage, new FileFragment(scanNode.getTableName(), new Path("/dummy"), 0L, 0L, new String[]{UNKNOWN_HOST}));
        ArrayList<FetchImpl> arrayList = new ArrayList();
        for (ExecutionBlock executionBlock : masterPlan.getChilds(stage.getId())) {
            for (Task task : stage.getContext().getStage(executionBlock.getId()).getTasks()) {
                for (Task.IntermediateEntry intermediateEntry : task.getIntermediateData()) {
                    FetchImpl fetchImpl = new FetchImpl(intermediateEntry.getPullHost(), PlanProto.ShuffleType.RANGE_SHUFFLE, executionBlock.getId(), 0);
                    fetchImpl.addPart(intermediateEntry.getTaskId(), intermediateEntry.getAttemptId());
                    arrayList.add(fetchImpl);
                }
            }
        }
        TreeMap treeMap = new TreeMap();
        try {
            RowStoreUtil.RowStoreEncoder createEncoder = RowStoreUtil.createEncoder(schema);
            int i3 = 0;
            while (i3 < partition.length) {
                HashSet hashSet = new HashSet();
                for (FetchImpl fetchImpl2 : arrayList) {
                    String rangeToQuery = TupleUtil.rangeToQuery(partition[i3], i3 == partition.length - 1, createEncoder);
                    try {
                        FetchImpl m1490clone = fetchImpl2.m1490clone();
                        m1490clone.setRangeParams(rangeToQuery);
                        hashSet.add(m1490clone);
                    } catch (CloneNotSupportedException e2) {
                        throw new RuntimeException(e2);
                    }
                }
                treeMap.put(partition[i3], hashSet);
                i3++;
            }
        } catch (UnsupportedEncodingException e3) {
            LOG.error(e3);
        }
        scheduleFetchesByRoundRobin(stage, treeMap, scanNode.getTableName(), i2);
        taskSchedulerContext.setEstimatedTaskNum(i2);
    }

    public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchImpl>> map, String str, int i) {
        Map[] mapArr = new Map[i];
        for (int i2 = 0; i2 < i; i2++) {
            mapArr[i2] = new HashMap();
        }
        int i3 = 0;
        Iterator<Map.Entry<?, Collection<FetchImpl>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            int i4 = i3;
            i3++;
            TUtil.putCollectionToNestedList(mapArr[i4], str, it.next().getValue());
            if (i3 == i) {
                i3 = 0;
            }
        }
        for (Map map2 : mapArr) {
            Stage.scheduleFetches(stage, map2);
        }
    }

    public static void scheduleHashShuffledFetches(TaskSchedulerContext taskSchedulerContext, MasterPlan masterPlan, Stage stage, DataChannel dataChannel, int i) throws IOException {
        ExecutionBlock block = stage.getBlock();
        ScanNode scanNode = block.getScanNodes()[0];
        FileFragment fileFragment = new FileFragment(scanNode.getCanonicalName(), new Path("/dummy"), 0L, 0L, new String[]{UNKNOWN_HOST});
        ArrayList arrayList = new ArrayList();
        arrayList.add(fileFragment);
        Stage.scheduleFragments(stage, arrayList);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (ExecutionBlock executionBlock : masterPlan.getChilds(block)) {
            ArrayList arrayList2 = new ArrayList();
            arrayList2.addAll(stage.getContext().getStage(executionBlock.getId()).getHashShuffleIntermediateEntries());
            if (dataChannel.getShuffleType() == PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE) {
                if (hashMap2.containsKey(executionBlock.getId())) {
                    ((List) hashMap2.get(executionBlock.getId())).addAll(arrayList2);
                } else {
                    hashMap2.put(executionBlock.getId(), arrayList2);
                }
            }
            for (Map.Entry<Integer, List<Task.IntermediateEntry>> entry : hashByKey(arrayList2).entrySet()) {
                for (Map.Entry<Task.PullHost, List<Task.IntermediateEntry>> entry2 : hashByHost(entry.getValue()).entrySet()) {
                    FetchImpl fetchImpl = new FetchImpl(entry2.getKey(), dataChannel.getShuffleType(), executionBlock.getId(), entry.getKey().intValue(), entry2.getValue());
                    long j = 0;
                    Iterator<Task.IntermediateEntry> it = entry2.getValue().iterator();
                    while (it.hasNext()) {
                        j += it.next().getVolume();
                    }
                    if (hashMap.containsKey(entry.getKey())) {
                        ((FetchGroupMeta) hashMap.get(entry.getKey())).addFetche(fetchImpl).increaseVolume(j);
                    } else {
                        hashMap.put(entry.getKey(), new FetchGroupMeta(j, fetchImpl));
                    }
                }
            }
        }
        int i2 = 0;
        GroupbyNode[] findAllNodes = PlannerUtil.findAllNodes(stage.getBlock().getPlan(), new NodeType[]{NodeType.GROUP_BY, NodeType.DISTINCT_GROUP_BY});
        if (findAllNodes != null && findAllNodes.length > 0) {
            GroupbyNode groupbyNode = findAllNodes[0];
            if (groupbyNode.getType() == NodeType.GROUP_BY) {
                i2 = groupbyNode.getGroupingColumns().length;
            } else if (groupbyNode.getType() == NodeType.DISTINCT_GROUP_BY) {
                DistinctGroupbyNode findMostBottomNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
                if (findMostBottomNode == null) {
                    LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode");
                    findMostBottomNode = (DistinctGroupbyNode) groupbyNode;
                }
                i2 = findMostBottomNode.getGroupingColumns().length;
                PlanProto.EnforceProperty algorithmEnforceProperty = PhysicalPlannerImpl.getAlgorithmEnforceProperty(block.getEnforcer(), findMostBottomNode);
                if (algorithmEnforceProperty != null && algorithmEnforceProperty.getDistinct().getIsMultipleAggregation() && algorithmEnforceProperty.getDistinct().getMultipleAggregationStage() != PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage.THRID_STAGE) {
                    i2 = findMostBottomNode.getOutSchema().size();
                }
            }
        }
        int min = Math.min(i, hashMap.size());
        LOG.info(stage.getId() + ", ScheduleHashShuffledFetches - Max num=" + i + ", finalFetchURI=" + hashMap.size());
        if (i2 == 0) {
            min = 1;
            LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
        } else if (computeChildBlocksStats(stage.getContext(), masterPlan, stage.getId()).getNumRows().longValue() == 0) {
            min = 1;
        }
        if (dataChannel.getShuffleType() == PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE) {
            scheduleScatteredHashShuffleFetches(taskSchedulerContext, stage, hashMap2, scanNode.getTableName());
            return;
        }
        taskSchedulerContext.setEstimatedTaskNum(min);
        scheduleFetchesByEvenDistributedVolumes(stage, hashMap, scanNode.getTableName(), min);
        LOG.info(stage.getId() + ", DeterminedTaskNum : " + min);
    }

    public static Pair<Long[], Map<String, List<FetchImpl>>[]> makeEvenDistributedFetchImpl(Map<Integer, FetchGroupMeta> map, String str, int i) {
        ArrayList newArrayList = Lists.newArrayList(map.values());
        Collections.sort(newArrayList, new Comparator<FetchGroupMeta>() { // from class: org.apache.tajo.querymaster.Repartitioner.1
            @Override // java.util.Comparator
            public int compare(FetchGroupMeta fetchGroupMeta, FetchGroupMeta fetchGroupMeta2) {
                if (fetchGroupMeta.getVolume() < fetchGroupMeta2.getVolume()) {
                    return 1;
                }
                return fetchGroupMeta.getVolume() > fetchGroupMeta2.getVolume() ? -1 : 0;
            }
        });
        Map[] mapArr = new Map[i];
        Long[] lArr = new Long[i];
        for (int i2 = 0; i2 < i; i2++) {
            mapArr[i2] = new HashMap();
            lArr[i2] = 0L;
        }
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            for (int i3 = 0; i3 < i && it.hasNext(); i3++) {
                FetchGroupMeta fetchGroupMeta = (FetchGroupMeta) it.next();
                int i4 = i3;
                lArr[i4] = Long.valueOf(lArr[i4].longValue() + fetchGroupMeta.getVolume());
                TUtil.putCollectionToNestedList(mapArr[i3], str, fetchGroupMeta.fetchUrls);
            }
            for (int i5 = i - 1; i5 >= 0 && it.hasNext(); i5--) {
                FetchGroupMeta fetchGroupMeta2 = (FetchGroupMeta) it.next();
                int i6 = i5;
                lArr[i6] = Long.valueOf(lArr[i6].longValue() + fetchGroupMeta2.getVolume());
                TUtil.putCollectionToNestedList(mapArr[i5], str, fetchGroupMeta2.fetchUrls);
                while (it.hasNext() && i5 > 0 && lArr[i5 - 1].longValue() > lArr[i5].longValue()) {
                    FetchGroupMeta fetchGroupMeta3 = (FetchGroupMeta) it.next();
                    int i7 = i5;
                    lArr[i7] = Long.valueOf(lArr[i7].longValue() + fetchGroupMeta3.getVolume());
                    TUtil.putCollectionToNestedList(mapArr[i5], str, fetchGroupMeta3.fetchUrls);
                }
            }
        }
        return new Pair<>(lArr, mapArr);
    }

    public static void scheduleFetchesByEvenDistributedVolumes(Stage stage, Map<Integer, FetchGroupMeta> map, String str, int i) {
        for (Map map2 : (Map[]) makeEvenDistributedFetchImpl(map, str, i).getSecond()) {
            Stage.scheduleFetches(stage, map2);
        }
    }

    public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext taskSchedulerContext, Stage stage, Map<ExecutionBlockId, List<Task.IntermediateEntry>> map, String str) {
        long j = 1048576 * stage.getMasterPlan().getContext().getInt(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE);
        long intVar = 1048576 * stage.getContext().getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME);
        if (intVar >= j) {
            throw new RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be great than tajo.shuffle.hash.appender.page.volumn-mb");
        }
        ArrayList<List> arrayList = new ArrayList();
        long j2 = 0;
        for (Map.Entry<ExecutionBlockId, List<Task.IntermediateEntry>> entry : map.entrySet()) {
            HashMap hashMap = new HashMap();
            for (Task.IntermediateEntry intermediateEntry : entry.getValue()) {
                j2 += intermediateEntry.getVolume();
                int partId = intermediateEntry.getPartId();
                List list = (List) hashMap.get(Integer.valueOf(partId));
                if (list == null) {
                    hashMap.put(Integer.valueOf(partId), TUtil.newList(new Task.IntermediateEntry[]{intermediateEntry}));
                } else {
                    list.add(intermediateEntry);
                }
            }
            Iterator it = hashMap.values().iterator();
            while (it.hasNext()) {
                List<List<FetchImpl>> splitOrMergeIntermediates = splitOrMergeIntermediates(entry.getKey(), (List) it.next(), j, intVar);
                if (splitOrMergeIntermediates != null && !splitOrMergeIntermediates.isEmpty()) {
                    arrayList.addAll(splitOrMergeIntermediates);
                }
            }
        }
        taskSchedulerContext.setEstimatedTaskNum(arrayList.size());
        int i = 0;
        Map[] mapArr = new Map[arrayList.size()];
        for (List list2 : arrayList) {
            mapArr[i] = new HashMap();
            mapArr[i].put(str, list2);
            Stage.scheduleFetches(stage, mapArr[i]);
            i++;
        }
        LOG.info(stage.getId() + ", ShuffleType:" + PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE.name() + ", Intermediate Size: " + j2 + ", splitSize: " + j + ", DeterminedTaskNum: " + arrayList.size());
    }

    public static List<List<FetchImpl>> splitOrMergeIntermediates(ExecutionBlockId executionBlockId, List<Task.IntermediateEntry> list, long j, long j2) {
        ArrayList arrayList = new ArrayList();
        Iterator<Task.IntermediateEntry> it = list.iterator();
        if (!it.hasNext()) {
            return null;
        }
        ArrayList arrayList2 = new ArrayList();
        long j3 = 0;
        while (it.hasNext()) {
            Task.IntermediateEntry next = it.next();
            long j4 = j - j3;
            if (j4 < j2) {
                j4 = j;
            }
            List<Pair<Long, Long>> split = next.split(j4, j);
            if (split == null || split.isEmpty()) {
                break;
            }
            for (Pair<Long, Long> pair : split) {
                if (j3 > 0 && j3 + ((Long) pair.getSecond()).longValue() >= j) {
                    if (!arrayList2.isEmpty()) {
                        arrayList.add(arrayList2);
                    }
                    arrayList2 = new ArrayList();
                    j3 = 0;
                }
                FetchImpl fetchImpl = new FetchImpl(next.getPullHost(), PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE, executionBlockId, next.getPartId(), TUtil.newList(new Task.IntermediateEntry[]{next}));
                fetchImpl.setOffset(((Long) pair.getFirst()).longValue());
                fetchImpl.setLength(((Long) pair.getSecond()).longValue());
                arrayList2.add(fetchImpl);
                j3 += ((Long) pair.getSecond()).longValue();
            }
        }
        if (!arrayList2.isEmpty()) {
            arrayList.add(arrayList2);
        }
        return arrayList;
    }

    public static List<URI> createFetchURL(FetchImpl fetchImpl, boolean z) {
        StringBuilder sb = new StringBuilder("http://");
        sb.append(fetchImpl.getPullHost().getHost()).append(":").append(fetchImpl.getPullHost().getPort()).append("/?").append("qid=").append(fetchImpl.getExecutionBlockId().getQueryId().toString()).append("&sid=").append(fetchImpl.getExecutionBlockId().getId()).append("&p=").append(fetchImpl.getPartitionId()).append("&type=");
        if (fetchImpl.getType() == PlanProto.ShuffleType.HASH_SHUFFLE) {
            sb.append("h");
        } else if (fetchImpl.getType() == PlanProto.ShuffleType.RANGE_SHUFFLE) {
            sb.append("r").append("&").append(fetchImpl.getRangeParams());
        } else if (fetchImpl.getType() == PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE) {
            sb.append("s");
        }
        if (fetchImpl.getLength() >= 0) {
            sb.append("&offset=").append(fetchImpl.getOffset()).append("&length=").append(fetchImpl.getLength());
        }
        ArrayList arrayList = new ArrayList();
        if (!z) {
            arrayList.add(URI.create(sb.toString()));
        } else if (fetchImpl.getType() == PlanProto.ShuffleType.HASH_SHUFFLE || fetchImpl.getType() == PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE) {
            arrayList.add(URI.create(sb.toString()));
        } else {
            ArrayList arrayList2 = new ArrayList();
            StringBuilder sb2 = new StringBuilder();
            List<Integer> taskIds = fetchImpl.getTaskIds();
            List<Integer> attemptIds = fetchImpl.getAttemptIds();
            boolean z2 = true;
            for (int i = 0; i < taskIds.size(); i++) {
                StringBuilder sb3 = new StringBuilder();
                if (z2) {
                    z2 = false;
                } else {
                    sb3.append(",");
                }
                int intValue = taskIds.get(i).intValue();
                if (intValue >= 0) {
                    int intValue2 = attemptIds.get(i).intValue();
                    sb3.append(intValue).append("_").append(intValue2);
                    if (sb2.length() + sb3.length() > HTTP_REQUEST_MAXIMUM_LENGTH) {
                        arrayList2.add(sb2.toString());
                        sb2 = new StringBuilder(intValue + "_" + intValue2);
                    } else {
                        sb2.append((CharSequence) sb3);
                    }
                }
            }
            if (sb2.length() > 0) {
                arrayList2.add(sb2.toString());
            }
            sb.append("&ta=");
            Iterator it = arrayList2.iterator();
            while (it.hasNext()) {
                arrayList.add(URI.create(((Object) sb) + ((String) it.next())));
            }
        }
        return arrayList;
    }

    public static Map<Integer, List<Task.IntermediateEntry>> hashByKey(List<Task.IntermediateEntry> list) {
        HashMap hashMap = new HashMap();
        for (Task.IntermediateEntry intermediateEntry : list) {
            if (hashMap.containsKey(Integer.valueOf(intermediateEntry.getPartId()))) {
                ((List) hashMap.get(Integer.valueOf(intermediateEntry.getPartId()))).add(intermediateEntry);
            } else {
                hashMap.put(Integer.valueOf(intermediateEntry.getPartId()), TUtil.newList(new Task.IntermediateEntry[]{intermediateEntry}));
            }
        }
        return hashMap;
    }

    public static Map<Task.PullHost, List<Task.IntermediateEntry>> hashByHost(List<Task.IntermediateEntry> list) {
        HashMap hashMap = new HashMap();
        for (Task.IntermediateEntry intermediateEntry : list) {
            Task.PullHost pullHost = intermediateEntry.getPullHost();
            if (hashMap.containsKey(pullHost)) {
                ((List) hashMap.get(pullHost)).add(intermediateEntry);
            } else {
                hashMap.put(pullHost, TUtil.newList(new Task.IntermediateEntry[]{intermediateEntry}));
            }
        }
        return hashMap;
    }

    public static Stage setShuffleOutputNumForTwoPhase(Stage stage, int i, DataChannel dataChannel) {
        ExecutionBlock block = stage.getBlock();
        MasterPlan masterPlan = stage.getMasterPlan();
        Column[] shuffleKeys = dataChannel.getShuffleKeys();
        if (!masterPlan.isRoot(stage.getBlock()) && masterPlan.getParent(stage.getBlock()).getPlan().getType() == NodeType.JOIN) {
            dataChannel.setShuffleOutputNum(i);
        }
        if (dataChannel.getShuffleType() == PlanProto.ShuffleType.HASH_SHUFFLE) {
            if (block.getPlan().getType() == NodeType.GROUP_BY || block.getPlan().getType() == NodeType.DISTINCT_GROUP_BY) {
                shuffleKeys = dataChannel.getShuffleKeys();
            }
        } else if (dataChannel.getShuffleType() == PlanProto.ShuffleType.RANGE_SHUFFLE && block.getPlan().getType() == NodeType.SORT) {
            SortNode plan = block.getPlan();
            shuffleKeys = new Column[plan.getSortKeys().length];
            for (int i2 = 0; i2 < shuffleKeys.length; i2++) {
                shuffleKeys[i2] = plan.getSortKeys()[i2].getSortKey();
            }
        }
        if (shuffleKeys != null) {
            if (shuffleKeys.length == 0) {
                dataChannel.setShuffleKeys(new Column[0]);
                dataChannel.setShuffleOutputNum(1);
            } else {
                dataChannel.setShuffleKeys(shuffleKeys);
                dataChannel.setShuffleOutputNum(i);
            }
        }
        return stage;
    }
}
