/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.batch.connectors.pulsar;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.flink.batch.connectors.pulsar.CachedClients;
import org.apache.flink.batch.connectors.pulsar.InputLedger;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableList;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SplitUtils {
    private static final Logger log = LoggerFactory.getLogger(SplitUtils.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Collection<InputLedger> getLedgersInBetween(String topic, MessageIdImpl start, MessageIdImpl end, CachedClients cachedClients) throws Exception {
        ReadOnlyCursorImpl readOnlyCursor = null;
        try {
            ManagedLedgerFactoryImpl mlFactory = cachedClients.getManagedLedgerFactory();
            readOnlyCursor = (ReadOnlyCursorImpl)mlFactory.openReadOnlyCursor(TopicName.get((String)topic).getPersistenceNamingEncoding(), (Position)PositionImpl.earliest, new ManagedLedgerConfig());
            ManagedLedgerImpl ml = (ManagedLedgerImpl)readOnlyCursor.getManagedLedger();
            ArrayList allLedgers = Lists.newArrayList(ml.getLedgersInfo().subMap(start.getLedgerId(), true, end.getLedgerId(), true).values());
            long actualStartLedger = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)allLedgers.get(0)).getLedgerId();
            long actualStartEntry = start.getEntryId() > 0L ? start.getEntryId() : 0L;
            MLDataFormats.ManagedLedgerInfo.LedgerInfo endLedger = (MLDataFormats.ManagedLedgerInfo.LedgerInfo)allLedgers.get(allLedgers.size() - 1);
            long actualEndLedger = endLedger.getLedgerId();
            long lastEntry = endLedger.getEntries() - 1L;
            long actualEndEntry = end.getEntryId() >= lastEntry ? lastEntry : (end.getEntryId() >= 0L ? end.getEntryId() : 0L);
            HashMap<Long, InputLedger> ledgersToRead = new HashMap<Long, InputLedger>();
            for (MLDataFormats.ManagedLedgerInfo.LedgerInfo li : allLedgers) {
                if (li.hasOffloadContext() && li.getOffloadContext().getComplete()) {
                    UUID uid = new UUID(li.getOffloadContext().getUidMsb(), li.getOffloadContext().getUidLsb());
                    Map metadata = OffloadUtils.getOffloadDriverMetadata((MLDataFormats.ManagedLedgerInfo.LedgerInfo)li);
                    metadata.put("ManagedLedgerName", topic);
                    ledgersToRead.put(li.getLedgerId(), new InputLedger(topic, li.getLedgerId(), 0L, li.getEntries() - 1L, uid, metadata));
                    continue;
                }
                ledgersToRead.put(li.getLedgerId(), new InputLedger(topic, li.getLedgerId(), 0L, li.getEntries() - 1L, null, null));
            }
            ((InputLedger)ledgersToRead.get(actualStartLedger)).setStartEntryId(actualStartEntry);
            ((InputLedger)ledgersToRead.get(actualEndLedger)).setEndEntryId(actualEndEntry);
            Collection collection = ledgersToRead.values();
            return collection;
        }
        catch (Exception e) {
            e.printStackTrace();
            Collection<InputLedger> collection = null;
            return collection;
        }
        finally {
            if (readOnlyCursor != null) {
                try {
                    readOnlyCursor.close();
                }
                catch (Exception e) {
                    log.error("Failed to close readOnly cursor", (Throwable)e);
                }
            }
        }
    }

    public static List<List<InputLedger>> partitionToNSplits(List<InputLedger> ledgers, int parallelism) {
        if (ledgers.size() <= parallelism) {
            return ledgers.stream().map(l -> ImmutableList.of((Object)l)).collect(Collectors.toList());
        }
        long totalSize = SplitUtils.sizeOfLedgerList(ledgers);
        long avgSizePerSplit = totalSize / (long)parallelism;
        ledgers.sort(null);
        int li = ledgers.size() - 1;
        int k = parallelism;
        List[] outputGroups = new List[k];
        for (int i = 0; i < outputGroups.length; ++i) {
            outputGroups[i] = new ArrayList();
        }
        while (li >= 0 && ledgers.get(li).ledgerSize() >= avgSizePerSplit) {
            outputGroups[k - 1].add(ledgers.get(li));
            --li;
            --k;
        }
        SplitUtils.search(outputGroups, li, ledgers, avgSizePerSplit);
        return Arrays.asList(outputGroups);
    }

    private static boolean search(List<InputLedger>[] outputGroups, int li, List<InputLedger> ledgers, long avgSizePerSplit) {
        if (li < 0) {
            return true;
        }
        InputLedger currentLedger = ledgers.get(li);
        --li;
        long smallestGroupSize = Long.MAX_VALUE;
        int smallestGroupIndex = 0;
        for (int i = 0; i < outputGroups.length; ++i) {
            long groupISize = SplitUtils.sizeOfLedgerList(outputGroups[i]);
            if (groupISize + currentLedger.ledgerSize() <= avgSizePerSplit) {
                outputGroups[i].add(currentLedger);
                if (SplitUtils.search(outputGroups, li, ledgers, avgSizePerSplit)) {
                    return true;
                }
                outputGroups[i].remove(outputGroups[i].size() - 1);
            }
            if (groupISize >= smallestGroupSize) continue;
            smallestGroupSize = groupISize;
            smallestGroupIndex = i;
        }
        outputGroups[smallestGroupIndex].add(currentLedger);
        SplitUtils.search(outputGroups, li, ledgers, avgSizePerSplit);
        return true;
    }

    public static long sizeOfLedgerList(List<InputLedger> ledgers) {
        return ledgers.stream().mapToLong(InputLedger::ledgerSize).sum();
    }
}

