package org.apache.flink.connector.kinesis.source.enumerator;

import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer;
import org.apache.flink.connector.kinesis.source.util.TestUtil;
import org.apache.flink.core.io.VersionMismatchException;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializerTest.class */
class KinesisStreamsSourceEnumeratorStateSerializerTest {

    /* loaded from: input_file:org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializerTest$WrongVersionSplitSerializer.class */
    private static class WrongVersionSplitSerializer extends KinesisShardSplitSerializer {
        private WrongVersionSplitSerializer() {
        }

        public int getVersion() {
            return -1;
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorStateSerializerTest$WrongVersionStateSerializer.class */
    private static class WrongVersionStateSerializer extends KinesisStreamsSourceEnumeratorStateSerializer {
        public WrongVersionStateSerializer(KinesisShardSplitSerializer kinesisShardSplitSerializer) {
            super(kinesisShardSplitSerializer);
        }

        public int getVersion() {
            return -1;
        }
    }

    KinesisStreamsSourceEnumeratorStateSerializerTest() {
    }

    @Test
    void testSerializeAndDeserializeEverythingSpecified() throws Exception {
        KinesisStreamsSourceEnumeratorState kinesisStreamsSourceEnumeratorState = new KinesisStreamsSourceEnumeratorState((Set) Stream.of((Object[]) new KinesisShardSplit[]{TestUtil.getTestSplit(TestUtil.generateShardId(1)), TestUtil.getTestSplit(TestUtil.generateShardId(2))}).collect(Collectors.toSet()), TestUtil.SHARD_ID);
        KinesisStreamsSourceEnumeratorStateSerializer kinesisStreamsSourceEnumeratorStateSerializer = new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer());
        AssertionsForClassTypes.assertThat(kinesisStreamsSourceEnumeratorStateSerializer.deserialize(kinesisStreamsSourceEnumeratorStateSerializer.getVersion(), kinesisStreamsSourceEnumeratorStateSerializer.serialize(kinesisStreamsSourceEnumeratorState))).usingRecursiveComparison().isEqualTo(kinesisStreamsSourceEnumeratorState);
    }

    @Test
    void testDeserializeWithWrongVersionStateSerializer() throws Exception {
        KinesisStreamsSourceEnumeratorState kinesisStreamsSourceEnumeratorState = new KinesisStreamsSourceEnumeratorState((Set) Stream.of((Object[]) new KinesisShardSplit[]{TestUtil.getTestSplit(TestUtil.generateShardId(1)), TestUtil.getTestSplit(TestUtil.generateShardId(2))}).collect(Collectors.toSet()), TestUtil.SHARD_ID);
        KinesisShardSplitSerializer kinesisShardSplitSerializer = new KinesisShardSplitSerializer();
        KinesisStreamsSourceEnumeratorStateSerializer kinesisStreamsSourceEnumeratorStateSerializer = new KinesisStreamsSourceEnumeratorStateSerializer(kinesisShardSplitSerializer);
        WrongVersionStateSerializer wrongVersionStateSerializer = new WrongVersionStateSerializer(kinesisShardSplitSerializer);
        byte[] serialize = wrongVersionStateSerializer.serialize(kinesisStreamsSourceEnumeratorState);
        AssertionsForClassTypes.assertThatExceptionOfType(VersionMismatchException.class).isThrownBy(() -> {
            kinesisStreamsSourceEnumeratorStateSerializer.deserialize(wrongVersionStateSerializer.getVersion(), serialize);
        }).withMessageContaining("Trying to deserialize KinesisStreamsSourceEnumeratorState serialized with unsupported version").withMessageContaining(String.valueOf(kinesisStreamsSourceEnumeratorStateSerializer.getVersion())).withMessageContaining(String.valueOf(wrongVersionStateSerializer.getVersion()));
    }

    @Test
    void testDeserializeWithWrongVersionSplitSerializer() throws Exception {
        KinesisStreamsSourceEnumeratorState kinesisStreamsSourceEnumeratorState = new KinesisStreamsSourceEnumeratorState((Set) Stream.of((Object[]) new KinesisShardSplit[]{TestUtil.getTestSplit(TestUtil.generateShardId(1)), TestUtil.getTestSplit(TestUtil.generateShardId(2))}).collect(Collectors.toSet()), TestUtil.SHARD_ID);
        KinesisStreamsSourceEnumeratorStateSerializer kinesisStreamsSourceEnumeratorStateSerializer = new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer());
        KinesisStreamsSourceEnumeratorStateSerializer kinesisStreamsSourceEnumeratorStateSerializer2 = new KinesisStreamsSourceEnumeratorStateSerializer(new WrongVersionSplitSerializer());
        byte[] serialize = kinesisStreamsSourceEnumeratorStateSerializer2.serialize(kinesisStreamsSourceEnumeratorState);
        AssertionsForClassTypes.assertThatExceptionOfType(VersionMismatchException.class).isThrownBy(() -> {
            kinesisStreamsSourceEnumeratorStateSerializer.deserialize(kinesisStreamsSourceEnumeratorStateSerializer.getVersion(), serialize);
        }).withMessageContaining("Trying to deserialize KinesisShardSplit serialized with unsupported version").withMessageContaining(String.valueOf(kinesisStreamsSourceEnumeratorStateSerializer.getVersion())).withMessageContaining(String.valueOf(kinesisStreamsSourceEnumeratorStateSerializer2.getVersion()));
    }

    @Test
    void testSerializeWithTrailingBytes() throws Exception {
        KinesisStreamsSourceEnumeratorState kinesisStreamsSourceEnumeratorState = new KinesisStreamsSourceEnumeratorState((Set) Stream.of((Object[]) new KinesisShardSplit[]{TestUtil.getTestSplit(TestUtil.generateShardId(1)), TestUtil.getTestSplit(TestUtil.generateShardId(2))}).collect(Collectors.toSet()), TestUtil.SHARD_ID);
        KinesisStreamsSourceEnumeratorStateSerializer kinesisStreamsSourceEnumeratorStateSerializer = new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer());
        byte[] serialize = kinesisStreamsSourceEnumeratorStateSerializer.serialize(kinesisStreamsSourceEnumeratorState);
        byte[] copyOf = Arrays.copyOf(serialize, serialize.length + 1);
        AssertionsForClassTypes.assertThatExceptionOfType(IOException.class).isThrownBy(() -> {
            kinesisStreamsSourceEnumeratorStateSerializer.deserialize(kinesisStreamsSourceEnumeratorStateSerializer.getVersion(), copyOf);
        }).withMessageContaining("Unexpected trailing bytes when deserializing.");
    }
}
