package com.datatorrent.stram.stream;

import com.datatorrent.api.Sink;
import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
import com.datatorrent.stram.engine.SweepableReservoir;
import java.io.IOException;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Ignore
/* loaded from: input_file:com/datatorrent/stram/stream/FastPublisherTest.class */
public class FastPublisherTest {
    private static final Logger logger;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/datatorrent/stram/stream/FastPublisherTest$FastPublisherImpl.class */
    static class FastPublisherImpl extends FastPublisher {
        FastPublisherImpl(int i) {
            super("testpublisher", i);
        }

        public void write() throws IOException {
            FastPublisherTest.logger.debug("disabled intentionally - please use consume instead");
        }

        /* JADX WARN: Removed duplicated region for block: B:22:0x0065 A[EXC_TOP_SPLITTER, SYNTHETIC] */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public byte[] consume() {
            /*
                Method dump skipped, instructions count: 213
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: com.datatorrent.stram.stream.FastPublisherTest.FastPublisherImpl.consume():byte[]");
        }
    }

    @Test
    public void testSerialization() throws Exception {
        FastPublisherImpl fastPublisherImpl = new FastPublisherImpl(24576);
        fastPublisherImpl.put("hello!");
        byte[] consume = fastPublisherImpl.consume();
        FastSubscriber fastSubscriber = new FastSubscriber("subscriber", 1024);
        DefaultStatefulStreamCodec defaultStatefulStreamCodec = new DefaultStatefulStreamCodec();
        fastSubscriber.statefulSerde = defaultStatefulStreamCodec;
        fastSubscriber.serde = defaultStatefulStreamCodec;
        SweepableReservoir acquireReservoir = fastSubscriber.acquireReservoir("res", 1024);
        acquireReservoir.setSink(new Sink<Object>() { // from class: com.datatorrent.stram.stream.FastPublisherTest.1
            static final /* synthetic */ boolean $assertionsDisabled;

            public void put(Object obj) {
                if (!$assertionsDisabled && !obj.equals("hello!")) {
                    throw new AssertionError();
                }
            }

            public int getCount(boolean z) {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            static {
                $assertionsDisabled = !FastPublisherTest.class.desiredAssertionStatus();
            }
        });
        int i = consume[0] | (consume[1] << 8);
        Assert.assertEquals("size", consume.length - 2, i);
        fastSubscriber.onMessage(consume, 2, consume.length - 2);
        acquireReservoir.sweep();
        acquireReservoir.sweep();
        for (int i2 = 0; i2 < 1024; i2++) {
            fastPublisherImpl.put("hello!");
        }
        byte[] consume2 = fastPublisherImpl.consume();
        if (!$assertionsDisabled && consume2.length != (i + 2) * 1024) {
            throw new AssertionError();
        }
        int i3 = 0;
        for (int i4 = 0; i4 < 1024; i4++) {
            int i5 = i3;
            int i6 = i3 + 1;
            int i7 = i6 + 1;
            i = consume2[i5] | (consume2[i6] << 8);
            fastSubscriber.onMessage(consume2, i7, i);
            i3 = i7 + i;
        }
        acquireReservoir.sweep();
        acquireReservoir.sweep();
        for (int i8 = 0; i8 < 1024; i8++) {
            fastPublisherImpl.put("hello!");
        }
        byte[] consume3 = fastPublisherImpl.consume();
        if (!$assertionsDisabled && consume3.length != (i + 2) * 1024) {
            throw new AssertionError();
        }
        int i9 = 0;
        for (int i10 = 0; i10 < 1024; i10++) {
            int i11 = i9;
            int i12 = i9 + 1;
            int i13 = i12 + 1;
            int i14 = consume3[i11] | (consume3[i12] << 8);
            fastSubscriber.onMessage(consume3, i13, i14);
            i9 = i13 + i14;
        }
        acquireReservoir.sweep();
        acquireReservoir.sweep();
    }

    static {
        $assertionsDisabled = !FastPublisherTest.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(FastPublisherTest.class);
    }
}
