/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.enumerator.assigner;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;

public class NormalSplitAssigner
implements SplitAssigner {
    private static final long serialVersionUID = 8412586087991597092L;
    private final StopCursor stopCursor;
    private final SourceConfiguration sourceConfiguration;
    private final Set<TopicPartition> appendedPartitions;
    private final Set<PulsarPartitionSplit> pendingPartitionSplits;
    private boolean initialized;

    public NormalSplitAssigner(StopCursor stopCursor, SourceConfiguration sourceConfiguration) {
        this.stopCursor = stopCursor;
        this.sourceConfiguration = sourceConfiguration;
        this.appendedPartitions = new HashSet<TopicPartition>();
        this.pendingPartitionSplits = new HashSet<PulsarPartitionSplit>();
        this.initialized = false;
    }

    public NormalSplitAssigner(StopCursor stopCursor, SourceConfiguration sourceConfiguration, PulsarSourceEnumState sourceEnumState) {
        this.stopCursor = stopCursor;
        this.sourceConfiguration = sourceConfiguration;
        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
        this.pendingPartitionSplits = sourceEnumState.getPendingPartitionSplits();
        this.initialized = sourceEnumState.isInitialized();
    }

    @Override
    public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
        ArrayList<TopicPartition> newPartitions = new ArrayList<TopicPartition>(fetchedPartitions.size());
        for (TopicPartition partition : fetchedPartitions) {
            if (this.appendedPartitions.contains(partition)) continue;
            this.pendingPartitionSplits.add(new PulsarPartitionSplit(partition, this.stopCursor));
            this.appendedPartitions.add(partition);
            newPartitions.add(partition);
        }
        if (!this.initialized) {
            this.initialized = true;
        }
        return newPartitions;
    }

    @Override
    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
        this.pendingPartitionSplits.addAll(splits);
    }

    @Override
    public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(List<Integer> readers) {
        if (this.pendingPartitionSplits.isEmpty() || readers.isEmpty()) {
            return Optional.empty();
        }
        HashMap<Integer, List> assignMap = new HashMap<Integer, List>();
        ArrayList<PulsarPartitionSplit> partitionSplits = new ArrayList<PulsarPartitionSplit>(this.pendingPartitionSplits);
        this.pendingPartitionSplits.clear();
        int readerCount = readers.size();
        for (int i = 0; i < partitionSplits.size(); ++i) {
            int index = i % readerCount;
            Integer readerId = readers.get(index);
            PulsarPartitionSplit split = (PulsarPartitionSplit)partitionSplits.get(i);
            assignMap.computeIfAbsent(readerId, id -> new ArrayList()).add(split);
        }
        return Optional.of(new SplitsAssignment(assignMap));
    }

    @Override
    public boolean noMoreSplits(Integer reader) {
        return !this.sourceConfiguration.isEnablePartitionDiscovery() && this.initialized && this.pendingPartitionSplits.isEmpty();
    }

    @Override
    public PulsarSourceEnumState snapshotState() {
        return new PulsarSourceEnumState(this.appendedPartitions, this.pendingPartitionSplits, new HashMap<Integer, Set<PulsarPartitionSplit>>(), new HashMap<Integer, Set<String>>(), this.initialized);
    }
}

