package org.apache.flink.connector.file.src;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.function.BiConsumer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.class */
public class PendingSplitsCheckpointSerializerTest {
    @Test
    public void serializeEmptyCheckpoint() throws Exception {
        PendingSplitsCheckpoint fromCollectionSnapshot = PendingSplitsCheckpoint.fromCollectionSnapshot(Collections.emptyList());
        assertCheckpointsEqual(fromCollectionSnapshot, serializeAndDeserialize(fromCollectionSnapshot));
    }

    @Test
    public void serializeSomeSplits() throws Exception {
        PendingSplitsCheckpoint fromCollectionSnapshot = PendingSplitsCheckpoint.fromCollectionSnapshot(Arrays.asList(testSplit1(), testSplit2(), testSplit3()));
        assertCheckpointsEqual(fromCollectionSnapshot, serializeAndDeserialize(fromCollectionSnapshot));
    }

    @Test
    public void serializeSplitsAndProcessedPaths() throws Exception {
        PendingSplitsCheckpoint fromCollectionSnapshot = PendingSplitsCheckpoint.fromCollectionSnapshot(Arrays.asList(testSplit1(), testSplit2(), testSplit3()), Arrays.asList(new Path("file:/some/path"), new Path("s3://bucket/key/and/path"), new Path("hdfs://namenode:12345/path")));
        assertCheckpointsEqual(fromCollectionSnapshot, serializeAndDeserialize(fromCollectionSnapshot));
    }

    @Test
    public void repeatedSerialization() throws Exception {
        PendingSplitsCheckpoint fromCollectionSnapshot = PendingSplitsCheckpoint.fromCollectionSnapshot(Arrays.asList(testSplit3(), testSplit1()));
        serializeAndDeserialize(fromCollectionSnapshot);
        serializeAndDeserialize(fromCollectionSnapshot);
        assertCheckpointsEqual(fromCollectionSnapshot, serializeAndDeserialize(fromCollectionSnapshot));
    }

    @Test
    public void repeatedSerializationCaches() throws Exception {
        PendingSplitsCheckpoint fromCollectionSnapshot = PendingSplitsCheckpoint.fromCollectionSnapshot(Collections.singletonList(testSplit2()));
        Assert.assertSame(new PendingSplitsCheckpointSerializer(FileSourceSplitSerializer.INSTANCE).serialize(fromCollectionSnapshot), new PendingSplitsCheckpointSerializer(FileSourceSplitSerializer.INSTANCE).serialize(fromCollectionSnapshot));
    }

    private static FileSourceSplit testSplit1() {
        return new FileSourceSplit("random-id", new Path("hdfs://namenode:14565/some/path/to/a/file"), 100000000L, 64000000L, 0L, 200000000L, new String[]{"host1", "host2", "host3"});
    }

    private static FileSourceSplit testSplit2() {
        return new FileSourceSplit("some-id", new Path("file:/some/path/to/a/file"), 0L, 0L, 0L, 0L);
    }

    private static FileSourceSplit testSplit3() {
        return new FileSourceSplit("an-id", new Path("s3://some-bucket/key/to/the/object"), 0L, 1234567L, 0L, 1234567L);
    }

    private static PendingSplitsCheckpoint<FileSourceSplit> serializeAndDeserialize(PendingSplitsCheckpoint<FileSourceSplit> pendingSplitsCheckpoint) throws IOException {
        PendingSplitsCheckpointSerializer pendingSplitsCheckpointSerializer = new PendingSplitsCheckpointSerializer(FileSourceSplitSerializer.INSTANCE);
        return (PendingSplitsCheckpoint) SimpleVersionedSerialization.readVersionAndDeSerialize(pendingSplitsCheckpointSerializer, SimpleVersionedSerialization.writeVersionAndSerialize(pendingSplitsCheckpointSerializer, pendingSplitsCheckpoint));
    }

    private static void assertCheckpointsEqual(PendingSplitsCheckpoint<FileSourceSplit> pendingSplitsCheckpoint, PendingSplitsCheckpoint<FileSourceSplit> pendingSplitsCheckpoint2) {
        assertOrderedCollectionEquals(pendingSplitsCheckpoint.getSplits(), pendingSplitsCheckpoint2.getSplits(), FileSourceSplitSerializerTest::assertSplitsEqual);
        assertOrderedCollectionEquals(pendingSplitsCheckpoint.getAlreadyProcessedPaths(), pendingSplitsCheckpoint2.getAlreadyProcessedPaths(), (v0, v1) -> {
            Assert.assertEquals(v0, v1);
        });
    }

    private static <E> void assertOrderedCollectionEquals(Collection<E> collection, Collection<E> collection2, BiConsumer<E, E> biConsumer) {
        Assert.assertEquals(collection.size(), collection2.size());
        Iterator<E> it = collection.iterator();
        Iterator<E> it2 = collection2.iterator();
        while (it.hasNext()) {
            biConsumer.accept(it.next(), it2.next());
        }
    }
}
