/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar.internal;

import java.io.IOException;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscription;
import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarSourceStateSerializer
implements SimpleVersionedSerializer<Tuple2<TopicSubscription, MessageId>>,
Serializable {
    private static final Logger log = LoggerFactory.getLogger(PulsarSourceStateSerializer.class);
    private static final int CURRENT_VERSION = 4;
    private final ExecutionConfig executionConfig;
    private Map<Integer, SerializableFunction<byte[], Tuple2<TopicSubscription, MessageId>>> oldStateSerializer;

    public PulsarSourceStateSerializer(ExecutionConfig executionConfig) {
        this.executionConfig = executionConfig;
        this.oldStateSerializer = new LinkedHashMap<Integer, SerializableFunction<byte[], Tuple2<TopicSubscription, MessageId>>>();
        this.oldStateSerializer.put(0, bytes -> {
            DataInputDeserializer deserializer = new DataInputDeserializer(bytes);
            Tuple2 deserialize = (Tuple2)this.getV0Serializer().deserialize((DataInputView)deserializer);
            TopicSubscription topicSubscription = TopicSubscription.builder().topic((String)deserialize.f0).range(SerializableRange.ofFullRange()).build();
            return Tuple2.of((Object)topicSubscription, (Object)deserialize.f1);
        });
        this.oldStateSerializer.put(1, bytes -> {
            DataInputDeserializer deserializer = new DataInputDeserializer(bytes);
            Tuple2 deserialize = (Tuple2)this.getV1Serializer().deserialize((DataInputView)deserializer);
            TopicSubscription topicSubscription = TopicSubscription.builder().topic(((TopicRange)deserialize.f0).getTopic()).range(((TopicRange)deserialize.f0).getRange()).build();
            return Tuple2.of((Object)topicSubscription, (Object)deserialize.f1);
        });
        this.oldStateSerializer.put(2, bytes -> {
            DataInputDeserializer deserializer = new DataInputDeserializer(bytes);
            Tuple3 deserialize = (Tuple3)this.getV2Serializer().deserialize((DataInputView)deserializer);
            TopicSubscription topicSubscription = TopicSubscription.builder().topic(((TopicRange)deserialize.f0).getTopic()).range(((TopicRange)deserialize.f0).getRange()).subscriptionName((String)deserialize.f2).build();
            return Tuple2.of((Object)topicSubscription, (Object)deserialize.f1);
        });
    }

    public int getVersion() {
        return 4;
    }

    public byte[] serialize(Tuple2<TopicSubscription, MessageId> obj) throws IOException {
        throw new UnsupportedEncodingException("for Pulsar source state migration only");
    }

    public Tuple2<TopicSubscription, MessageId> deserialize(int version, byte[] serialized) throws IOException {
        Exception exception = null;
        for (Map.Entry<Integer, SerializableFunction<byte[], Tuple2<TopicSubscription, MessageId>>> entry : this.oldStateSerializer.entrySet()) {
            try {
                Tuple2<TopicSubscription, MessageId> tuple2 = entry.getValue().apply(serialized);
                log.info("pulsar deser old state " + tuple2);
                return tuple2;
            }
            catch (Exception e) {
                exception = e;
            }
        }
        throw new IllegalArgumentException("not restore Pulsar state", exception);
    }

    public Tuple2<TopicSubscription, MessageId> deserialize(int version, Object oldStateObject) throws IOException {
        DataOutputSerializer target = new DataOutputSerializer(8192);
        switch (version) {
            case 0: {
                this.getV0Serializer().serialize((Tuple)((Tuple2)oldStateObject), (DataOutputView)target);
                break;
            }
            case 1: {
                this.getV1Serializer().serialize((Tuple)((Tuple2)oldStateObject), (DataOutputView)target);
                break;
            }
            case 2: {
                this.getV2Serializer().serialize((Tuple)((Tuple3)oldStateObject), (DataOutputView)target);
                break;
            }
            default: {
                throw new IllegalArgumentException("unsupport old pulsar state version");
            }
        }
        return this.deserialize(version, target.getSharedBuffer());
    }

    public TupleSerializer<Tuple2<String, MessageId>> getV0Serializer() {
        TypeSerializer[] fieldSerializers = new TypeSerializer[]{StringSerializer.INSTANCE, new KryoSerializer(MessageId.class, this.executionConfig)};
        Class<Tuple2> tupleClass = Tuple2.class;
        return new TupleSerializer(tupleClass, fieldSerializers);
    }

    public TupleSerializer<Tuple2<TopicRange, MessageId>> getV1Serializer() {
        TypeSerializer[] fieldSerializers = new TypeSerializer[]{new KryoSerializer(TopicRange.class, this.executionConfig), new KryoSerializer(MessageId.class, this.executionConfig)};
        Class<Tuple2> tupleClass = Tuple2.class;
        return new TupleSerializer(tupleClass, fieldSerializers);
    }

    public TupleSerializer<Tuple3<TopicRange, MessageId, String>> getV2Serializer() {
        TypeSerializer[] fieldSerializers = new TypeSerializer[]{new KryoSerializer(TopicRange.class, this.executionConfig), new KryoSerializer(MessageId.class, this.executionConfig), new StringSerializer()};
        Class<Tuple3> tupleClass = Tuple3.class;
        return new TupleSerializer(tupleClass, fieldSerializers);
    }

    public TupleSerializer<?> getSerializer(int oldStateVersion) {
        switch (oldStateVersion) {
            case 0: {
                return this.getV0Serializer();
            }
            case 1: {
                return this.getV1Serializer();
            }
            case 2: {
                return this.getV2Serializer();
            }
        }
        throw new IllegalArgumentException("unsupport old pulsar state version");
    }

    public static interface SerializableFunction<T, R>
    extends Serializable {
        public R apply(T var1) throws Exception;
    }
}

