/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.IntegerSerializer;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
import org.apache.flink.streaming.runtime.operators.sink.committables.SubtaskCommittableManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class CommittableCollectorSerializerTest {
    private static final SimpleVersionedSerializer<Integer> COMMITTABLE_SERIALIZER = new IntegerSerializer();
    private static final CommittableCollectorSerializer<Integer> SERIALIZER = new CommittableCollectorSerializer(COMMITTABLE_SERIALIZER, 1, 1);

    CommittableCollectorSerializerTest() {
    }

    @Test
    void testCommittableCollectorV1SerDe() throws IOException {
        List<Integer> legacyState = Arrays.asList(1, 2, 3);
        DataOutputSerializer out = new DataOutputSerializer(256);
        out.writeInt(-1189141204);
        SimpleVersionedSerialization.writeVersionAndSerializeList(COMMITTABLE_SERIALIZER, legacyState, (DataOutputView)out);
        byte[] serialized = out.getCopyOfBuffer();
        CommittableCollector committableCollector = SERIALIZER.deserialize(1, serialized);
        Assertions.assertThat((int)committableCollector.getNumberOfSubtasks()).isEqualTo(1);
        Assertions.assertThat((boolean)committableCollector.isFinished()).isFalse();
        Assertions.assertThat((int)committableCollector.getSubtaskId()).isEqualTo(0);
        Collection checkpointCommittables = committableCollector.getCheckpointCommittables();
        Assertions.assertThat((Collection)checkpointCommittables).hasSize(1);
        SubtaskCommittableManager subtaskCommittableManager = ((CheckpointCommittableManagerImpl)checkpointCommittables.iterator().next()).getSubtaskCommittableManager(0);
        Assertions.assertThat(subtaskCommittableManager.getPendingRequests().map(CommitRequestImpl::getCommittable).collect(Collectors.toList())).containsExactly((Object[])new Integer[]{1, 2, 3});
    }

    @Test
    void testCommittableCollectorV2SerDe() throws IOException {
        CommittableCollector committableCollector = new CommittableCollector(2, 3);
        committableCollector.addMessage((CommittableMessage)new CommittableSummary(2, 3, Long.valueOf(1L), 1, 1, 0));
        committableCollector.addMessage((CommittableMessage)new CommittableSummary(2, 3, Long.valueOf(2L), 1, 1, 0));
        committableCollector.addMessage((CommittableMessage)new CommittableWithLineage((Object)1, Long.valueOf(1L), 2));
        committableCollector.addMessage((CommittableMessage)new CommittableWithLineage((Object)2, Long.valueOf(2L), 2));
        CommittableCollector copy = SERIALIZER.deserialize(2, SERIALIZER.serialize(committableCollector));
        Assertions.assertThat((int)copy.getSubtaskId()).isEqualTo(1);
        Assertions.assertThat((boolean)copy.isFinished()).isFalse();
        Assertions.assertThat((int)copy.getNumberOfSubtasks()).isEqualTo(1);
        Collection checkpointCommittables = committableCollector.getCheckpointCommittables();
        Assertions.assertThat((Collection)checkpointCommittables).hasSize(2);
        Iterator committablesIterator = checkpointCommittables.iterator();
        SubtaskCommittableManager subtaskCommittableManagerCheckpoint1 = ((CheckpointCommittableManagerImpl)committablesIterator.next()).getSubtaskCommittableManager(2);
        Assertions.assertThat(subtaskCommittableManagerCheckpoint1.getPendingRequests().map(CommitRequestImpl::getCommittable).collect(Collectors.toList())).containsExactly((Object[])new Integer[]{1});
        SubtaskCommittableManager subtaskCommittableManagerCheckpoint2 = ((CheckpointCommittableManagerImpl)committablesIterator.next()).getSubtaskCommittableManager(2);
        Assertions.assertThat(subtaskCommittableManagerCheckpoint2.getPendingRequests().map(CommitRequestImpl::getCommittable).collect(Collectors.toList())).containsExactly((Object[])new Integer[]{2});
    }
}

