package org.apache.flink.streaming.api.operators.collect;

import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectResultBufferTest.class */
public class CollectResultBufferTest {
    private static final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE;

    @Test
    public void testUncheckpointedValidResponse() throws Exception {
        UncheckpointedCollectResultBuffer uncheckpointedCollectResultBuffer = new UncheckpointedCollectResultBuffer(serializer, false);
        uncheckpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("version", 0L, Collections.emptyList()), 0L);
        List<Integer> asList = Arrays.asList(1, 2, 3);
        uncheckpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("version", 0L, createSerializedResults(asList)), 0L);
        Iterator<Integer> it = asList.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(it.next(), uncheckpointedCollectResultBuffer.next());
        }
        List asList2 = Arrays.asList(4, 5);
        uncheckpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("version", 0L, createSerializedResults(Arrays.asList(3, 4, 5))), 2L);
        Iterator it2 = asList2.iterator();
        while (it2.hasNext()) {
            Assert.assertEquals((Integer) it2.next(), uncheckpointedCollectResultBuffer.next());
        }
        Assert.assertNull(uncheckpointedCollectResultBuffer.next());
    }

    @Test
    public void testUncheckpointedFaultTolerance() throws Exception {
        UncheckpointedCollectResultBuffer uncheckpointedCollectResultBuffer = new UncheckpointedCollectResultBuffer(serializer, true);
        uncheckpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("version", 0L, Collections.emptyList()), 0L);
        List<Integer> asList = Arrays.asList(1, 2, 3);
        uncheckpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("version", 0L, createSerializedResults(asList)), 0L);
        Iterator<Integer> it = asList.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(it.next(), uncheckpointedCollectResultBuffer.next());
        }
        uncheckpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("another", 0L, Collections.emptyList()), 0L);
        uncheckpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("another", 0L, createSerializedResults(asList)), 0L);
        Iterator<Integer> it2 = asList.iterator();
        while (it2.hasNext()) {
            Assert.assertEquals(it2.next(), uncheckpointedCollectResultBuffer.next());
        }
    }

    @Test(expected = RuntimeException.class)
    public void testUncheckpointedNotFaultTolerance() throws Exception {
        UncheckpointedCollectResultBuffer uncheckpointedCollectResultBuffer = new UncheckpointedCollectResultBuffer(serializer, false);
        uncheckpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("version", 0L, Collections.emptyList()), 0L);
        List<Integer> asList = Arrays.asList(1, 2, 3);
        uncheckpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("version", 0L, createSerializedResults(asList)), 0L);
        Iterator<Integer> it = asList.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(it.next(), uncheckpointedCollectResultBuffer.next());
        }
        uncheckpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("another", 0L, Collections.emptyList()), 0L);
    }

    @Test
    public void testCheckpointedValidResponse() throws Exception {
        CheckpointedCollectResultBuffer checkpointedCollectResultBuffer = new CheckpointedCollectResultBuffer(serializer);
        checkpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("version", 0L, Collections.emptyList()), 0L);
        List<Integer> asList = Arrays.asList(1, 2, 3);
        checkpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("version", 0L, createSerializedResults(asList)), 0L);
        Assert.assertNull(checkpointedCollectResultBuffer.next());
        checkpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("version", 3L, createSerializedResults(Arrays.asList(4, 5, 6))), 3L);
        Iterator<Integer> it = asList.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(it.next(), checkpointedCollectResultBuffer.next());
        }
        List asList2 = Arrays.asList(4, 5, 6);
        checkpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("version", 6L, createSerializedResults(Arrays.asList(6, 7))), 5L);
        Iterator it2 = asList2.iterator();
        while (it2.hasNext()) {
            Assert.assertEquals((Integer) it2.next(), checkpointedCollectResultBuffer.next());
        }
        checkpointedCollectResultBuffer.complete();
        Assert.assertEquals(7, checkpointedCollectResultBuffer.next());
        Assert.assertNull(checkpointedCollectResultBuffer.next());
    }

    @Test
    public void testCheckpointedRestart() throws Exception {
        CheckpointedCollectResultBuffer checkpointedCollectResultBuffer = new CheckpointedCollectResultBuffer(serializer);
        checkpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("version", 0L, Collections.emptyList()), 0L);
        checkpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("version", 0L, createSerializedResults(Arrays.asList(1, 2, 3))), 0L);
        Assert.assertNull(checkpointedCollectResultBuffer.next());
        checkpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("another", 0L, Collections.emptyList()), 0L);
        List<Integer> asList = Arrays.asList(4, 5, 6);
        checkpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("another", 0L, createSerializedResults(asList)), 0L);
        Assert.assertNull(checkpointedCollectResultBuffer.next());
        checkpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("another", 3L, Collections.emptyList()), 0L);
        Iterator<Integer> it = asList.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(it.next(), checkpointedCollectResultBuffer.next());
        }
        Assert.assertNull(checkpointedCollectResultBuffer.next());
    }

    @Test
    public void testImmediateAccumulatorResult() throws Exception {
        UncheckpointedCollectResultBuffer uncheckpointedCollectResultBuffer = new UncheckpointedCollectResultBuffer(serializer, false);
        List<Integer> asList = Arrays.asList(1, 2, 3);
        uncheckpointedCollectResultBuffer.dealWithResponse(new CollectCoordinationResponse("version", 0L, createSerializedResults(asList)), 0L);
        uncheckpointedCollectResultBuffer.complete();
        Iterator<Integer> it = asList.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(it.next(), uncheckpointedCollectResultBuffer.next());
        }
        Assert.assertNull(uncheckpointedCollectResultBuffer.next());
    }

    private List<byte[]> createSerializedResults(List<Integer> list) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            serializer.serialize(Integer.valueOf(intValue), new DataOutputViewStreamWrapper(byteArrayOutputStream));
            arrayList.add(byteArrayOutputStream.toByteArray());
        }
        return arrayList;
    }
}
