/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.enumerator.assigner;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;

public class SharedSplitAssigner
implements SplitAssigner {
    private static final long serialVersionUID = 8468503133499402491L;
    private final StopCursor stopCursor;
    private final SourceConfiguration sourceConfiguration;
    private final Set<TopicPartition> appendedPartitions;
    private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
    private final Map<Integer, Set<String>> readerAssignedSplits;
    private boolean initialized;

    public SharedSplitAssigner(StopCursor stopCursor, SourceConfiguration sourceConfiguration) {
        this.stopCursor = stopCursor;
        this.sourceConfiguration = sourceConfiguration;
        this.appendedPartitions = new HashSet<TopicPartition>();
        this.sharedPendingPartitionSplits = new HashMap<Integer, Set<PulsarPartitionSplit>>();
        this.readerAssignedSplits = new HashMap<Integer, Set<String>>();
        this.initialized = false;
    }

    public SharedSplitAssigner(StopCursor stopCursor, SourceConfiguration sourceConfiguration, PulsarSourceEnumState sourceEnumState) {
        this.stopCursor = stopCursor;
        this.sourceConfiguration = sourceConfiguration;
        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
        this.sharedPendingPartitionSplits = sourceEnumState.getSharedPendingPartitionSplits();
        this.readerAssignedSplits = sourceEnumState.getReaderAssignedSplits();
        this.initialized = sourceEnumState.isInitialized();
    }

    @Override
    public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
        ArrayList<TopicPartition> newPartitions = new ArrayList<TopicPartition>(fetchedPartitions.size());
        for (TopicPartition fetchedPartition : fetchedPartitions) {
            if (this.appendedPartitions.contains(fetchedPartition)) continue;
            newPartitions.add(fetchedPartition);
            this.appendedPartitions.add(fetchedPartition);
        }
        if (!this.initialized) {
            this.initialized = true;
        }
        return newPartitions;
    }

    @Override
    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
        Set pending = this.sharedPendingPartitionSplits.computeIfAbsent(subtaskId, id -> new HashSet());
        pending.addAll(splits);
    }

    @Override
    public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(List<Integer> readers) {
        if (readers.isEmpty()) {
            return Optional.empty();
        }
        HashMap<Integer, ArrayList<PulsarPartitionSplit>> assignMap = new HashMap<Integer, ArrayList<PulsarPartitionSplit>>();
        for (Integer reader : readers) {
            Set<PulsarPartitionSplit> pendingSplits = this.sharedPendingPartitionSplits.remove(reader);
            if (pendingSplits == null) {
                pendingSplits = new HashSet<PulsarPartitionSplit>();
            }
            Set assignedSplits = this.readerAssignedSplits.computeIfAbsent(reader, r -> new HashSet());
            for (TopicPartition partition : this.appendedPartitions) {
                String partitionName = partition.toString();
                if (assignedSplits.contains(partitionName)) continue;
                pendingSplits.add(new PulsarPartitionSplit(partition, this.stopCursor));
                assignedSplits.add(partitionName);
            }
            if (pendingSplits.isEmpty()) continue;
            assignMap.put(reader, new ArrayList<PulsarPartitionSplit>(pendingSplits));
        }
        if (assignMap.isEmpty()) {
            return Optional.empty();
        }
        return Optional.of(new SplitsAssignment(assignMap));
    }

    @Override
    public boolean noMoreSplits(Integer reader) {
        Set<PulsarPartitionSplit> pendingSplits = this.sharedPendingPartitionSplits.get(reader);
        Set<String> assignedSplits = this.readerAssignedSplits.get(reader);
        return !this.sourceConfiguration.isEnablePartitionDiscovery() && this.initialized && (pendingSplits == null || pendingSplits.isEmpty()) && assignedSplits != null && assignedSplits.size() == this.appendedPartitions.size();
    }

    @Override
    public PulsarSourceEnumState snapshotState() {
        return new PulsarSourceEnumState(this.appendedPartitions, new HashSet<PulsarPartitionSplit>(), this.sharedPendingPartitionSplits, this.readerAssignedSplits, this.initialized);
    }
}

