package com.datatorrent.stram.stream;

import com.datatorrent.api.Sink;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.bufferserver.packet.PayloadTuple;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.engine.SweepableReservoir;
import java.util.ArrayList;
import java.util.Arrays;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/stream/BufferServerSubscriberTest.class */
public class BufferServerSubscriberTest {
    private static final Logger logger = LoggerFactory.getLogger(BufferServerSubscriberTest.class);

    @Test
    public void testEmergencySinks() throws InterruptedException {
        final ArrayList arrayList = new ArrayList();
        final StreamCodec<Object> streamCodec = new StreamCodec<Object>() { // from class: com.datatorrent.stram.stream.BufferServerSubscriberTest.1
            public Object fromByteArray(Slice slice) {
                return (slice.offset == 0 && slice.length == slice.buffer.length) ? slice.buffer : Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length);
            }

            public Slice toByteArray(Object obj) {
                return new Slice((byte[]) obj, 0, ((byte[]) obj).length);
            }

            public int getPartition(Object obj) {
                return 0;
            }
        };
        Sink<Object> sink = new Sink<Object>() { // from class: com.datatorrent.stram.stream.BufferServerSubscriberTest.2
            public void put(Object obj) {
                arrayList.add(obj);
            }

            public int getCount(boolean z) {
                return 0;
            }
        };
        BufferServerSubscriber bufferServerSubscriber = new BufferServerSubscriber("subscriber", 5) { // from class: com.datatorrent.stram.stream.BufferServerSubscriberTest.3
            {
                this.serde = streamCodec;
            }

            public void suspendRead() {
                BufferServerSubscriberTest.logger.debug("read suspended");
            }

            public void resumeRead() {
                BufferServerSubscriberTest.logger.debug("read resumed");
            }
        };
        SweepableReservoir acquireReservoir = bufferServerSubscriber.acquireReservoir("unbufferedSink", 3);
        acquireReservoir.setSink(sink);
        int i = 0;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= 10) {
                acquireReservoir.sweep();
                acquireReservoir.sweep();
                Assert.assertEquals("4 received", 4L, arrayList.size());
                acquireReservoir.sweep();
                acquireReservoir.sweep();
                Assert.assertEquals("10  received", 10L, arrayList.size());
                return;
            }
            byte[] serializedTuple = PayloadTuple.getSerializedTuple(streamCodec.getPartition(Integer.valueOf(i)), streamCodec.toByteArray(new byte[]{(byte) i}));
            bufferServerSubscriber.onMessage(serializedTuple, 0, serializedTuple.length);
        }
    }
}
