/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.pulsar.source.BrokerPartition;
import org.apache.flink.connector.pulsar.source.SplitSchedulingStrategy;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;

public class KeySharedSplitSchedulingStrategy
implements SplitSchedulingStrategy {
    Map<SerializableRange, Integer> rangeToReaders = new HashMap<SerializableRange, Integer>();
    int nextId = 0;

    @Override
    public int getIndexOfReader(int numReaders, PulsarPartitionSplit split) {
        BrokerPartition partition = (BrokerPartition)split.getPartition();
        SerializableRange pulsarRange = partition.getTopicRange().getRange();
        return this.rangeToReaders.computeIfAbsent(pulsarRange, serializableRange -> {
            this.rangeToReaders.put((SerializableRange)serializableRange, this.nextId);
            int readerId = this.nextId++;
            return readerId;
        });
    }

    @Override
    public void addSplitsBack(Map<Integer, List<PulsarPartitionSplit>> pendingPartitionSplitAssignment, List<PulsarPartitionSplit> splits, int subtaskId, int numReaders) {
        pendingPartitionSplitAssignment.computeIfAbsent(subtaskId, r -> new ArrayList()).addAll(splits);
    }
}

