package org.apache.pinot.plugin.minion.tasks.mergerollup;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.I0Itec.zkclient.exception.ZkException;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.ZNRecord;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.minion.MergeRollupTaskMetadata;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TaskGenerator
/* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.class */
public class MergeRollupTaskGenerator implements PinotTaskGenerator {
    private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskGenerator.class);
    private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50000000;
    private static final int DEFAULT_NUM_PARALLEL_BUCKETS = 1;
    private static final String REFRESH = "REFRESH";
    private static final String MERGE_ROLLUP_TASK_DELAY_IN_NUM_BUCKETS = "mergeRollupTaskDelayInNumBuckets";
    private Map<String, Map<String, Long>> _mergeRollupWatermarks;
    private Map<String, Long> _tableMaxEndTimeMs;
    private ClusterInfoAccessor _clusterInfoAccessor;

    public void init(ClusterInfoAccessor clusterInfoAccessor) {
        this._clusterInfoAccessor = clusterInfoAccessor;
        this._mergeRollupWatermarks = new HashMap();
        this._tableMaxEndTimeMs = new HashMap();
    }

    public String getTaskType() {
        return "MergeRollupTask";
    }

    public List<PinotTaskConfig> generateTasks(List<TableConfig> list) {
        ArrayList arrayList = new ArrayList();
        for (TableConfig tableConfig : list) {
            if (validate(tableConfig, "MergeRollupTask")) {
                String tableName = tableConfig.getTableName();
                LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, "MergeRollupTask");
                List<SegmentZKMetadata> segmentsZKMetadata = this._clusterInfoAccessor.getSegmentsZKMetadata(tableName);
                SegmentLineage segmentLineage = this._clusterInfoAccessor.getSegmentLineage(tableName);
                HashSet hashSet = new HashSet();
                Iterator it = segmentsZKMetadata.iterator();
                while (it.hasNext()) {
                    hashSet.add(((SegmentZKMetadata) it.next()).getSegmentName());
                }
                SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(hashSet, segmentLineage);
                ArrayList<SegmentZKMetadata> arrayList2 = new ArrayList();
                for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
                    if (hashSet.contains(segmentZKMetadata.getSegmentName()) && segmentZKMetadata.getTotalDocs() > 0) {
                        arrayList2.add(segmentZKMetadata);
                    }
                }
                if (arrayList2.isEmpty()) {
                    resetDelayMetrics(tableName);
                    LOGGER.info("Skip generating task: {} for table: {}, no segment is found.", "MergeRollupTask", tableName);
                } else {
                    arrayList2.sort((segmentZKMetadata2, segmentZKMetadata3) -> {
                        long startTimeMs = segmentZKMetadata2.getStartTimeMs();
                        long startTimeMs2 = segmentZKMetadata3.getStartTimeMs();
                        if (startTimeMs != startTimeMs2) {
                            return Long.compare(startTimeMs, startTimeMs2);
                        }
                        long endTimeMs = segmentZKMetadata2.getEndTimeMs();
                        long endTimeMs2 = segmentZKMetadata3.getEndTimeMs();
                        return endTimeMs != endTimeMs2 ? Long.compare(endTimeMs, endTimeMs2) : segmentZKMetadata2.getSegmentName().compareTo(segmentZKMetadata3.getSegmentName());
                    });
                    Map<String, String> configsForTaskType = tableConfig.getTaskConfig().getConfigsForTaskType("MergeRollupTask");
                    ArrayList<Map.Entry> arrayList3 = new ArrayList(MergeRollupTaskUtils.getLevelToConfigMap(configsForTaskType).entrySet());
                    arrayList3.sort(Comparator.comparingLong(entry -> {
                        return TimeUtils.convertPeriodToMillis((String) ((Map) entry.getValue()).get("bucketTimePeriod")).longValue();
                    }));
                    HashSet hashSet2 = new HashSet();
                    Iterator it2 = TaskGeneratorUtils.getIncompleteTasks("MergeRollupTask", tableName, this._clusterInfoAccessor).entrySet().iterator();
                    while (it2.hasNext()) {
                        Iterator it3 = this._clusterInfoAccessor.getTaskConfigs((String) ((Map.Entry) it2.next()).getKey()).iterator();
                        while (it3.hasNext()) {
                            hashSet2.add((String) ((PinotTaskConfig) it3.next()).getConfigs().get("mergeLevel"));
                        }
                    }
                    ZNRecord minionTaskMetadataZNRecord = this._clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", tableName);
                    int version = minionTaskMetadataZNRecord != null ? minionTaskMetadataZNRecord.getVersion() : -1;
                    MergeRollupTaskMetadata fromZNRecord = minionTaskMetadataZNRecord != null ? MergeRollupTaskMetadata.fromZNRecord(minionTaskMetadataZNRecord) : new MergeRollupTaskMetadata(tableName, new TreeMap());
                    ArrayList arrayList4 = new ArrayList();
                    String str = null;
                    for (Map.Entry entry2 : arrayList3) {
                        String str2 = str;
                        str = (String) entry2.getKey();
                        Map<String, String> map = (Map) entry2.getValue();
                        if (hashSet2.contains(str)) {
                            LOGGER.info("Found incomplete task of merge level: {} for the same table: {}, Skipping task generation: {}", new Object[]{str, tableName, "MergeRollupTask"});
                        } else {
                            String str3 = map.get("bucketTimePeriod");
                            long longValue = TimeUtils.convertPeriodToMillis(str3).longValue();
                            if (longValue <= 0) {
                                LOGGER.error("Bucket time period: {} (table : {}, mergeLevel : {}) must be larger than 0", new Object[]{str3, tableName, str});
                            } else {
                                String str4 = map.get("bufferTimePeriod");
                                long longValue2 = TimeUtils.convertPeriodToMillis(str4).longValue();
                                if (longValue2 < 0) {
                                    LOGGER.error("Buffer time period: {} (table : {}, mergeLevel : {}) must be larger or equal to 0", new Object[]{str4, tableName, str});
                                } else {
                                    String str5 = map.get("maxNumParallelBuckets");
                                    int parseInt = str5 != null ? Integer.parseInt(str5) : DEFAULT_NUM_PARALLEL_BUCKETS;
                                    if (parseInt <= 0) {
                                        LOGGER.error("Maximum number of parallel buckets: {} (table : {}, mergeLevel : {}) must be larger than 0", new Object[]{Integer.valueOf(parseInt), tableName, str});
                                    } else {
                                        long watermarkMs = getWatermarkMs(((SegmentZKMetadata) arrayList2.get(0)).getStartTimeMs(), longValue, str, fromZNRecord);
                                        long j = watermarkMs;
                                        long j2 = j + longValue;
                                        long j3 = Long.MIN_VALUE;
                                        Iterator it4 = arrayList2.iterator();
                                        while (it4.hasNext()) {
                                            j3 = Math.max(j3, ((SegmentZKMetadata) it4.next()).getEndTimeMs());
                                        }
                                        createOrUpdateDelayMetrics(tableName, str, null, watermarkMs, j3, longValue2, longValue);
                                        if (isValidBucketEndTime(j2, longValue2, str2, fromZNRecord)) {
                                            ArrayList<List> arrayList5 = new ArrayList(parseInt);
                                            ArrayList arrayList6 = new ArrayList();
                                            boolean z = false;
                                            boolean z2 = false;
                                            for (SegmentZKMetadata segmentZKMetadata4 : arrayList2) {
                                                long startTimeMs = segmentZKMetadata4.getStartTimeMs();
                                                if (startTimeMs >= j2) {
                                                    if (z) {
                                                        arrayList5.add(arrayList6);
                                                    }
                                                    if (arrayList5.size() == parseInt || z2) {
                                                        break;
                                                    }
                                                    arrayList6 = new ArrayList();
                                                    j = (startTimeMs / longValue) * longValue;
                                                    j2 = j + longValue;
                                                    if (!isValidBucketEndTime(j2, longValue2, str2, fromZNRecord)) {
                                                        break;
                                                    }
                                                    z = isMergedSegment(segmentZKMetadata4, str) ? false : DEFAULT_NUM_PARALLEL_BUCKETS;
                                                    if (hasSpilledOverData(segmentZKMetadata4, longValue)) {
                                                        z2 = DEFAULT_NUM_PARALLEL_BUCKETS;
                                                    }
                                                    arrayList6.add(segmentZKMetadata4);
                                                } else if (segmentZKMetadata4.getEndTimeMs() >= j) {
                                                    if (!isMergedSegment(segmentZKMetadata4, str)) {
                                                        z = DEFAULT_NUM_PARALLEL_BUCKETS;
                                                    }
                                                    if (hasSpilledOverData(segmentZKMetadata4, longValue)) {
                                                        z2 = DEFAULT_NUM_PARALLEL_BUCKETS;
                                                    }
                                                    arrayList6.add(segmentZKMetadata4);
                                                }
                                            }
                                            if (z && (arrayList5.isEmpty() || arrayList5.get(arrayList5.size() - DEFAULT_NUM_PARALLEL_BUCKETS) != arrayList6)) {
                                                arrayList5.add(arrayList6);
                                            }
                                            if (arrayList5.isEmpty()) {
                                                LOGGER.info("No unmerged segment found for table: {}, mergeLevel: {}", tableName, str);
                                            } else {
                                                long startTimeMs2 = (((SegmentZKMetadata) ((List) arrayList5.get(0)).get(0)).getStartTimeMs() / longValue) * longValue;
                                                fromZNRecord.getWatermarkMap().put(str, Long.valueOf(startTimeMs2));
                                                LOGGER.info("Update watermark for table: {}, mergeLevel: {} from: {} to: {}", new Object[]{tableName, str, Long.valueOf(watermarkMs), Long.valueOf(startTimeMs2)});
                                                createOrUpdateDelayMetrics(tableName, str, str2, startTimeMs2, j3, longValue2, longValue);
                                                int parseInt2 = map.get("maxNumRecordsPerTask") != null ? Integer.parseInt(map.get("maxNumRecordsPerTask")) : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
                                                SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
                                                if (segmentPartitionConfig == null) {
                                                    Iterator it5 = arrayList5.iterator();
                                                    while (it5.hasNext()) {
                                                        arrayList4.addAll(createPinotTaskConfigs((List) it5.next(), tableName, parseInt2, str, map, configsForTaskType));
                                                    }
                                                } else {
                                                    Map columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
                                                    Preconditions.checkState(columnPartitionMap.size() == DEFAULT_NUM_PARALLEL_BUCKETS, "Cannot partition on multiple columns for table: %s", tableConfig.getTableName());
                                                    String str6 = (String) ((Map.Entry) columnPartitionMap.entrySet().iterator().next()).getKey();
                                                    for (List<SegmentZKMetadata> list2 : arrayList5) {
                                                        HashMap hashMap = new HashMap();
                                                        ArrayList arrayList7 = new ArrayList();
                                                        for (SegmentZKMetadata segmentZKMetadata5 : list2) {
                                                            SegmentPartitionMetadata partitionMetadata = segmentZKMetadata5.getPartitionMetadata();
                                                            if (partitionMetadata == null || partitionMetadata.getPartitions(str6).size() != DEFAULT_NUM_PARALLEL_BUCKETS) {
                                                                arrayList7.add(segmentZKMetadata5);
                                                            } else {
                                                                ((List) hashMap.computeIfAbsent(Integer.valueOf(((Integer) partitionMetadata.getPartitions(str6).iterator().next()).intValue()), num -> {
                                                                    return new ArrayList();
                                                                })).add(segmentZKMetadata5);
                                                            }
                                                        }
                                                        Iterator it6 = hashMap.entrySet().iterator();
                                                        while (it6.hasNext()) {
                                                            arrayList4.addAll(createPinotTaskConfigs((List) ((Map.Entry) it6.next()).getValue(), tableName, parseInt2, str, map, configsForTaskType));
                                                        }
                                                        if (!arrayList7.isEmpty()) {
                                                            arrayList4.addAll(createPinotTaskConfigs(arrayList7, tableName, parseInt2, str, map, configsForTaskType));
                                                        }
                                                    }
                                                }
                                            }
                                        } else {
                                            LOGGER.info("Bucket with start: {} and end: {} (table : {}, mergeLevel : {}) cannot be merged yet", new Object[]{Long.valueOf(j), Long.valueOf(j2), tableName, str});
                                        }
                                    }
                                }
                            }
                        }
                    }
                    try {
                        this._clusterInfoAccessor.setMinionTaskMetadata(fromZNRecord, "MergeRollupTask", version);
                        arrayList.addAll(arrayList4);
                        LOGGER.info("Finished generating task configs for table: {} for task: {}, numTasks: {}", new Object[]{tableName, "MergeRollupTask", Integer.valueOf(arrayList4.size())});
                    } catch (ZkException e) {
                        LOGGER.error("Version changed while updating merge/rollup task metadata for table: {}, skip scheduling. There are multiple task schedulers for the same table, need to investigate!", tableName);
                    }
                }
            }
        }
        cleanUpDelayMetrics(list);
        return arrayList;
    }

    private boolean validate(TableConfig tableConfig, String str) {
        String tableName = tableConfig.getTableName();
        if (tableConfig.getTableType() != TableType.OFFLINE) {
            LOGGER.warn("Skip generating task: {} for non-OFFLINE table: {}, REALTIME table is not supported yet", str, tableName);
            return false;
        }
        if (!REFRESH.equalsIgnoreCase(IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig))) {
            return true;
        }
        LOGGER.warn("Skip generating task: {} for non-APPEND table: {}, REFRESH table is not supported", str, tableName);
        return false;
    }

    private boolean hasSpilledOverData(SegmentZKMetadata segmentZKMetadata, long j) {
        return segmentZKMetadata.getStartTimeMs() / j < segmentZKMetadata.getEndTimeMs() / j;
    }

    private boolean isMergedSegment(SegmentZKMetadata segmentZKMetadata, String str) {
        Map customMap = segmentZKMetadata.getCustomMap();
        return customMap != null && str.equalsIgnoreCase((String) customMap.get("MergeRollupTask.mergeLevel"));
    }

    private boolean isValidBucketEndTime(long j, long j2, @Nullable String str, MergeRollupTaskMetadata mergeRollupTaskMetadata) {
        if (j > System.currentTimeMillis() - j2) {
            return false;
        }
        if (str == null) {
            return true;
        }
        Long l = (Long) mergeRollupTaskMetadata.getWatermarkMap().get(str);
        return l != null && j <= l.longValue();
    }

    private long getWatermarkMs(long j, long j2, String str, MergeRollupTaskMetadata mergeRollupTaskMetadata) {
        return mergeRollupTaskMetadata.getWatermarkMap().get(str) == null ? (j / j2) * j2 : ((Long) mergeRollupTaskMetadata.getWatermarkMap().get(str)).longValue();
    }

    private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> list, String str, int i, String str2, Map<String, String> map, Map<String, String> map2) {
        int i2 = 0;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        for (int i3 = 0; i3 < list.size(); i3 += DEFAULT_NUM_PARALLEL_BUCKETS) {
            SegmentZKMetadata segmentZKMetadata = list.get(i3);
            arrayList3.add(segmentZKMetadata.getSegmentName());
            arrayList4.add(segmentZKMetadata.getDownloadUrl());
            i2 = (int) (i2 + segmentZKMetadata.getTotalDocs());
            if (i2 >= i || i3 == list.size() - DEFAULT_NUM_PARALLEL_BUCKETS) {
                arrayList.add(arrayList3);
                arrayList2.add(arrayList4);
                i2 = 0;
                arrayList3 = new ArrayList();
                arrayList4 = new ArrayList();
            }
        }
        ArrayList arrayList5 = new ArrayList();
        for (int i4 = 0; i4 < arrayList.size(); i4 += DEFAULT_NUM_PARALLEL_BUCKETS) {
            HashMap hashMap = new HashMap();
            hashMap.put("tableName", str);
            hashMap.put("segmentName", StringUtils.join((Iterable) arrayList.get(i4), ","));
            hashMap.put("downloadURL", StringUtils.join((Iterable) arrayList2.get(i4), ","));
            hashMap.put("uploadURL", this._clusterInfoAccessor.getVipUrl() + "/segments");
            hashMap.put("enableReplaceSegments", "true");
            for (Map.Entry<String, String> entry : map2.entrySet()) {
                if (entry.getKey().endsWith(".aggregationType")) {
                    hashMap.put(entry.getKey(), entry.getValue());
                }
            }
            hashMap.put("mergeType", map.get("mergeType"));
            hashMap.put("mergeLevel", str2);
            hashMap.put("partitionBucketTimePeriod", map.get("bucketTimePeriod"));
            hashMap.put("roundBucketTimePeriod", map.get("roundBucketTimePeriod"));
            hashMap.put("maxNumRecordsPerSegment", map.get("maxNumRecordsPerSegment"));
            TableNameBuilder.extractRawTableName(str);
            hashMap.put("segmentNamePrefix", "merged_" + str2 + "_" + System.currentTimeMillis() + "_" + hashMap + "_" + i4);
            arrayList5.add(new PinotTaskConfig("MergeRollupTask", hashMap));
        }
        return arrayList5;
    }

    private long getMergeRollupTaskDelayInNumTimeBuckets(long j, long j2, long j3, long j4) {
        if (j == -1) {
            return 0L;
        }
        return (Math.min(System.currentTimeMillis() - j3, j2) - j) / j4;
    }

    private void createOrUpdateDelayMetrics(String str, String str2, String str3, long j, long j2, long j3, long j4) {
        ControllerMetrics controllerMetrics = this._clusterInfoAccessor.getControllerMetrics();
        if (controllerMetrics == null) {
            return;
        }
        Map<String, Long> computeIfAbsent = this._mergeRollupWatermarks.computeIfAbsent(str, str4 -> {
            return new ConcurrentHashMap();
        });
        this._tableMaxEndTimeMs.put(str, Long.valueOf(j2));
        computeIfAbsent.compute(str2, (str5, l) -> {
            if (l == null) {
                Logger logger = LOGGER;
                Object[] objArr = new Object[6];
                objArr[0] = str;
                objArr[DEFAULT_NUM_PARALLEL_BUCKETS] = str2;
                objArr[2] = Long.valueOf(j);
                objArr[3] = Long.valueOf(j4);
                objArr[4] = Long.valueOf(j4);
                objArr[5] = Long.valueOf(getMergeRollupTaskDelayInNumTimeBuckets(j, (str3 == null ? this._tableMaxEndTimeMs.get(str) : (Long) computeIfAbsent.get(str3)).longValue(), j3, j4));
                logger.info("Creating the gauge metric for tracking the merge/roll-up task delay for table: {} and mergeLevel: {}.(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, taskDelayInNumTimeBuckets={})", objArr);
                controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(str, str2), () -> {
                    return Long.valueOf(getMergeRollupTaskDelayInNumTimeBuckets(((Long) computeIfAbsent.getOrDefault(str5, -1L)).longValue(), (str3 == null ? this._tableMaxEndTimeMs.get(str) : (Long) computeIfAbsent.get(str3)).longValue(), j3, j4));
                });
            }
            return Long.valueOf(j);
        });
    }

    private void resetDelayMetrics(String str) {
        Map<String, Long> remove;
        ControllerMetrics controllerMetrics = this._clusterInfoAccessor.getControllerMetrics();
        if (controllerMetrics == null || (remove = this._mergeRollupWatermarks.remove(str)) == null) {
            return;
        }
        Iterator<String> it = remove.keySet().iterator();
        while (it.hasNext()) {
            controllerMetrics.removeGauge(getMetricNameForTaskDelay(str, it.next()));
        }
    }

    private void resetDelayMetrics(String str, String str2) {
        Map<String, Long> map;
        ControllerMetrics controllerMetrics = this._clusterInfoAccessor.getControllerMetrics();
        if (controllerMetrics == null || (map = this._mergeRollupWatermarks.get(str)) == null || map.remove(str2) == null) {
            return;
        }
        controllerMetrics.removeGauge(getMetricNameForTaskDelay(str, str2));
    }

    private void cleanUpDelayMetrics(List<TableConfig> list) {
        HashMap hashMap = new HashMap();
        for (TableConfig tableConfig : list) {
            hashMap.put(tableConfig.getTableName(), tableConfig);
        }
        Iterator it = new ArrayList(this._mergeRollupWatermarks.keySet()).iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            TableConfig tableConfig2 = (TableConfig) hashMap.get(str);
            if (tableConfig2 == null) {
                resetDelayMetrics(str);
            } else if (this._clusterInfoAccessor.getLeaderControllerManager().isLeaderForTable(str)) {
                Map<String, Map<String, String>> levelToConfigMap = MergeRollupTaskUtils.getLevelToConfigMap(tableConfig2.getTaskConfig().getConfigsForTaskType(getTaskType()));
                for (String str2 : this._mergeRollupWatermarks.get(str).keySet()) {
                    if (!levelToConfigMap.containsKey(str2)) {
                        resetDelayMetrics(str, str2);
                    }
                }
            } else {
                resetDelayMetrics(str);
            }
        }
    }

    private String getMetricNameForTaskDelay(String str, String str2) {
        return "mergeRollupTaskDelayInNumBuckets." + str + "." + str2;
    }
}
