package org.apache.iceberg.flink.sink.shuffle;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderComparators;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.class */
class MapRangePartitioner implements Partitioner<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(MapRangePartitioner.class);
    private final RowDataWrapper rowDataWrapper;
    private final SortKey sortKey;
    private final Comparator<StructLike> comparator;
    private final Map<SortKey, Long> mapStatistics;
    private final double closeFileCostInWeightPercentage;
    private long newSortKeyCounter;
    private long lastNewSortKeyLogTimeMilli;
    private Map<SortKey, KeyAssignment> assignment;
    private NavigableMap<SortKey, Long> sortedStatsWithCloseFileCost;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner$KeyAssignment.class */
    public static class KeyAssignment {
        private final int[] assignedSubtasks;
        private final long[] subtaskWeightsExcludingCloseCost;
        private final long keyWeight;
        private final long[] cumulativeWeights;

        KeyAssignment(List<Integer> list, List<Long> list2, long j) {
            Preconditions.checkArgument((list == null || list.isEmpty()) ? false : true, "Invalid assigned subtasks: null or empty");
            Preconditions.checkArgument((list2 == null || list2.isEmpty()) ? false : true, "Invalid assigned subtasks weights: null or empty");
            Preconditions.checkArgument(list.size() == list2.size(), "Invalid assignment: size mismatch (tasks length = %s, weights length = %s)", list.size(), list2.size());
            list2.forEach(l -> {
                Preconditions.checkArgument(l.longValue() > j, "Invalid weight: should be larger than close file cost: weight = %s, close file cost = %s", l, j);
            });
            this.assignedSubtasks = list.stream().mapToInt(num -> {
                return num.intValue();
            }).toArray();
            this.subtaskWeightsExcludingCloseCost = list2.stream().mapToLong(l2 -> {
                return l2.longValue() - j;
            }).toArray();
            this.keyWeight = Arrays.stream(this.subtaskWeightsExcludingCloseCost).sum();
            this.cumulativeWeights = new long[this.subtaskWeightsExcludingCloseCost.length];
            long j2 = 0;
            for (int i = 0; i < this.subtaskWeightsExcludingCloseCost.length; i++) {
                j2 += this.subtaskWeightsExcludingCloseCost[i];
                this.cumulativeWeights[i] = j2;
            }
        }

        int select() {
            if (this.assignedSubtasks.length == 1) {
                return this.assignedSubtasks[0];
            }
            long nextLong = ThreadLocalRandom.current().nextLong(this.keyWeight);
            int abs = Math.abs(Arrays.binarySearch(this.cumulativeWeights, nextLong) + 1);
            Preconditions.checkState(abs < this.assignedSubtasks.length, "Invalid selected position: out of range. key weight = %s, random number = %s, cumulative weights array = %s", Long.valueOf(this.keyWeight), Long.valueOf(nextLong), this.cumulativeWeights);
            return this.assignedSubtasks[abs];
        }

        public int hashCode() {
            return (31 * Arrays.hashCode(this.assignedSubtasks)) + Arrays.hashCode(this.subtaskWeightsExcludingCloseCost);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            KeyAssignment keyAssignment = (KeyAssignment) obj;
            return Arrays.equals(this.assignedSubtasks, keyAssignment.assignedSubtasks) && Arrays.equals(this.subtaskWeightsExcludingCloseCost, keyAssignment.subtaskWeightsExcludingCloseCost);
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("assignedSubtasks", this.assignedSubtasks).add("subtaskWeightsExcludingCloseCost", this.subtaskWeightsExcludingCloseCost).toString();
        }
    }

    MapRangePartitioner(Schema schema, SortOrder sortOrder, MapDataStatistics mapDataStatistics, double d) {
        mapDataStatistics.statistics().entrySet().forEach(entry -> {
            Preconditions.checkArgument(((Long) entry.getValue()).longValue() > 0, "Invalid statistics: weight is 0 for key %s", entry.getKey());
        });
        this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), schema.asStruct());
        this.sortKey = new SortKey(schema, sortOrder);
        this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
        this.mapStatistics = mapDataStatistics.statistics();
        this.closeFileCostInWeightPercentage = d;
        this.newSortKeyCounter = 0L;
        this.lastNewSortKeyLogTimeMilli = System.currentTimeMillis();
    }

    public int partition(RowData rowData, int i) {
        Map<SortKey, KeyAssignment> assignment = assignment(i);
        this.sortKey.wrap(this.rowDataWrapper.wrap(rowData));
        KeyAssignment keyAssignment = assignment.get(this.sortKey);
        if (keyAssignment != null) {
            return keyAssignment.select();
        }
        LOG.trace("Encountered new sort key: {}. Fall back to round robin as statistics not learned yet.", this.sortKey);
        this.newSortKeyCounter++;
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.lastNewSortKeyLogTimeMilli > TimeUnit.MINUTES.toMillis(1L)) {
            LOG.info("Encounter new sort keys in total {} times", Long.valueOf(this.newSortKeyCounter));
            this.lastNewSortKeyLogTimeMilli = currentTimeMillis;
        }
        return (int) (this.newSortKeyCounter % i);
    }

    @VisibleForTesting
    Map<SortKey, KeyAssignment> assignment(int i) {
        if (this.assignment == null) {
            double sum = this.mapStatistics.values().stream().mapToLong(l -> {
                return l.longValue();
            }).sum() / i;
            long ceil = (long) Math.ceil((sum * this.closeFileCostInWeightPercentage) / 100.0d);
            this.sortedStatsWithCloseFileCost = Maps.newTreeMap(this.comparator);
            this.mapStatistics.forEach((sortKey, l2) -> {
                this.sortedStatsWithCloseFileCost.put(sortKey, Long.valueOf(l2.longValue() + (ceil * ((int) Math.ceil(l2.longValue() / sum)))));
            });
            this.assignment = buildAssignment(i, this.sortedStatsWithCloseFileCost, (long) Math.ceil(this.sortedStatsWithCloseFileCost.values().stream().mapToLong(l3 -> {
                return l3.longValue();
            }).sum() / i), ceil);
        }
        return this.assignment;
    }

    @VisibleForTesting
    Map<SortKey, Long> mapStatistics() {
        return this.mapStatistics;
    }

    Map<Integer, Pair<Long, Integer>> assignmentInfo() {
        TreeMap newTreeMap = Maps.newTreeMap();
        this.assignment.forEach((sortKey, keyAssignment) -> {
            for (int i = 0; i < keyAssignment.assignedSubtasks.length; i++) {
                int i2 = keyAssignment.assignedSubtasks[i];
                long j = keyAssignment.subtaskWeightsExcludingCloseCost[i];
                Pair pair = (Pair) newTreeMap.getOrDefault(Integer.valueOf(i2), Pair.of(0L, 0));
                newTreeMap.put(Integer.valueOf(i2), Pair.of(Long.valueOf(((Long) pair.first()).longValue() + j), Integer.valueOf(((Integer) pair.second()).intValue() + 1)));
            }
        });
        return newTreeMap;
    }

    private Map<SortKey, KeyAssignment> buildAssignment(int i, NavigableMap<SortKey, Long> navigableMap, long j, long j2) {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(navigableMap.size());
        Iterator<SortKey> it = navigableMap.keySet().iterator();
        int i2 = 0;
        SortKey sortKey = null;
        long j3 = 0;
        long j4 = j;
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        while (true) {
            if (!it.hasNext() && sortKey == null) {
                return newHashMapWithExpectedSize;
            }
            if (i2 >= i) {
                LOG.error("Internal algorithm error: exhausted subtasks with unassigned keys left. number of partitions: {}, target weight per subtask: {}, close file cost in weight: {}, data statistics: {}", new Object[]{Integer.valueOf(i), Long.valueOf(j), Long.valueOf(j2), navigableMap});
                throw new IllegalStateException("Internal algorithm error: exhausted subtasks with unassigned keys left");
            }
            if (sortKey == null) {
                sortKey = it.next();
                j3 = ((Long) navigableMap.get(sortKey)).longValue();
            }
            newArrayList.add(Integer.valueOf(i2));
            if (j3 < j4) {
                newArrayList2.add(Long.valueOf(j3));
                j4 -= j3;
                j3 = 0;
            } else {
                long j5 = j4;
                j3 -= j4;
                if (j5 <= j2) {
                    long min = Math.min(j3, j2);
                    j3 -= min;
                    j5 += min;
                }
                newArrayList2.add(Long.valueOf(j5));
                i2++;
                j4 = j;
            }
            Preconditions.checkState(newArrayList.size() == newArrayList2.size(), "List size mismatch: assigned subtasks = %s, subtask weights = %s", newArrayList, newArrayList2);
            if (j3 > 0 && j3 <= j2) {
                j3 = 0;
            }
            if (j3 == 0) {
                newHashMapWithExpectedSize.put(sortKey, new KeyAssignment(newArrayList, newArrayList2, j2));
                newArrayList.clear();
                newArrayList2.clear();
                sortKey = null;
            }
        }
    }
}
