/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source.enumerator;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.utils.SerdeUtils;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;

@Internal
public class KafkaSourceEnumStateSerializer
implements SimpleVersionedSerializer<KafkaSourceEnumState> {
    private static final int VERSION_0 = 0;
    private static final int VERSION_1 = 1;
    private static final int CURRENT_VERSION = 1;

    public int getVersion() {
        return 1;
    }

    public byte[] serialize(KafkaSourceEnumState enumState) throws IOException {
        return KafkaSourceEnumStateSerializer.serializeTopicPartitions(enumState.assignedPartitions());
    }

    public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws IOException {
        if (version == 1) {
            Set<TopicPartition> assignedPartitions = KafkaSourceEnumStateSerializer.deserializeTopicPartitions(serialized);
            return new KafkaSourceEnumState(assignedPartitions);
        }
        if (version == 0) {
            Map<Integer, Set> currentPartitionAssignment = SerdeUtils.deserializeSplitAssignments(serialized, new KafkaPartitionSplitSerializer(), HashSet::new);
            HashSet<TopicPartition> currentAssignedSplits = new HashSet<TopicPartition>();
            currentPartitionAssignment.forEach((reader, splits) -> splits.forEach(split -> currentAssignedSplits.add(split.getTopicPartition())));
            return new KafkaSourceEnumState(currentAssignedSplits);
        }
        throw new IOException(String.format("The bytes are serialized with version %d, while this deserializer only supports version up to %d", version, 1));
    }

    private static byte[] serializeTopicPartitions(Collection<TopicPartition> topicPartitions) throws IOException {
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            Object object;
            try (DataOutputStream out = new DataOutputStream(baos);){
                out.writeInt(topicPartitions.size());
                for (TopicPartition tp : topicPartitions) {
                    out.writeUTF(tp.topic());
                    out.writeInt(tp.partition());
                }
                out.flush();
                object = baos.toByteArray();
            }
            return object;
        }
    }

    private static Set<TopicPartition> deserializeTopicPartitions(byte[] serializedTopicPartitions) throws IOException {
        try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedTopicPartitions);){
            HashSet<TopicPartition> hashSet;
            try (DataInputStream in = new DataInputStream(bais);){
                int numPartitions = in.readInt();
                HashSet<TopicPartition> topicPartitions = new HashSet<TopicPartition>(numPartitions);
                for (int i = 0; i < numPartitions; ++i) {
                    String topic = in.readUTF();
                    int partition = in.readInt();
                    topicPartitions.add(new TopicPartition(topic, partition));
                }
                if (in.available() > 0) {
                    throw new IOException("Unexpected trailing bytes in serialized topic partitions");
                }
                hashSet = topicPartitions;
            }
            return hashSet;
        }
    }
}

