package com.datatorrent.stram.engine;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Sink;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.support.ManualScheduledExecutorService;
import com.datatorrent.stram.tuple.ResetWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
import java.io.File;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/WindowGeneratorTest.class */
public class WindowGeneratorTest {
    public static final Logger logger;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* renamed from: com.datatorrent.stram.engine.WindowGeneratorTest$4, reason: invalid class name */
    /* loaded from: input_file:com/datatorrent/stram/engine/WindowGeneratorTest$4.class */
    static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$com$datatorrent$bufferserver$packet$MessageType = new int[MessageType.values().length];

        static {
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.BEGIN_WINDOW.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.END_WINDOW.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.RESET_WINDOW.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/WindowGeneratorTest$MyLogger.class */
    static class MyLogger extends BaseOperator {
        public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>() { // from class: com.datatorrent.stram.engine.WindowGeneratorTest.MyLogger.1
            public void process(Integer num) {
                WindowGeneratorTest.logger.debug("received {}", num);
            }
        };

        MyLogger() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/WindowGeneratorTest$RandomNumberGenerator.class */
    static class RandomNumberGenerator implements InputOperator {
        public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
        int count;

        RandomNumberGenerator() {
        }

        public void emitTuples() {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                WindowGeneratorTest.logger.debug("interrupted!", e);
            }
            DefaultOutputPort<Integer> defaultOutputPort = this.output;
            int i = this.count + 1;
            this.count = i;
            defaultOutputPort.emit(Integer.valueOf(i));
        }

        public void beginWindow(long j) {
        }

        public void endWindow() {
        }

        public void setup(Context.OperatorContext operatorContext) {
        }

        public void teardown() {
        }
    }

    @Test
    public void test2ndResetWindow() throws InterruptedException {
        logger.info("Testing 2nd Reset Window");
        ManualScheduledExecutorService manualScheduledExecutorService = new ManualScheduledExecutorService(1);
        WindowGenerator windowGenerator = new WindowGenerator(manualScheduledExecutorService, 33022);
        windowGenerator.setFirstWindow(0L);
        windowGenerator.setResetWindow(0L);
        windowGenerator.setWindowWidth(1);
        SweepableReservoir acquireReservoir = windowGenerator.acquireReservoir("output", 33022);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        acquireReservoir.setSink(new Sink<Object>() { // from class: com.datatorrent.stram.engine.WindowGeneratorTest.1
            public void put(Object obj) {
                if (atomicBoolean.get()) {
                    WindowGeneratorTest.logger.debug(obj.toString());
                }
            }

            public int getCount(boolean z) {
                return 0;
            }
        });
        windowGenerator.activate((StreamContext) null);
        manualScheduledExecutorService.tick(1L);
        manualScheduledExecutorService.tick(1L);
        atomicBoolean.set(false);
        for (int i = 0; i < 15997; i++) {
            manualScheduledExecutorService.tick(1L);
        }
        atomicBoolean.set(true);
        manualScheduledExecutorService.tick(1L);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        acquireReservoir.sweep();
        while (true) {
            Tuple sweep = acquireReservoir.sweep();
            if (sweep == null) {
                Assert.assertEquals("begin windows", 16001L, atomicInteger.get());
                Assert.assertEquals("end windows", 16000L, atomicInteger2.get());
                Assert.assertEquals("reset windows", 2L, atomicInteger3.get());
                return;
            }
            acquireReservoir.remove();
            switch (AnonymousClass4.$SwitchMap$com$datatorrent$bufferserver$packet$MessageType[sweep.getType().ordinal()]) {
                case 1:
                    atomicInteger.incrementAndGet();
                    break;
                case 2:
                    atomicInteger2.incrementAndGet();
                    break;
                case 3:
                    atomicInteger3.incrementAndGet();
                    break;
            }
        }
    }

    @Test
    public void testResetWindow() {
        ManualScheduledExecutorService manualScheduledExecutorService = new ManualScheduledExecutorService(1);
        manualScheduledExecutorService.setCurrentTimeMillis(184466110000L);
        WindowGenerator windowGenerator = new WindowGenerator(manualScheduledExecutorService, 17023);
        long currentTimeMillis = manualScheduledExecutorService.getCurrentTimeMillis();
        windowGenerator.setFirstWindow(currentTimeMillis);
        windowGenerator.setResetWindow(currentTimeMillis);
        windowGenerator.setWindowWidth(305441741);
        SweepableReservoir acquireReservoir = windowGenerator.acquireReservoir("output", 1024);
        acquireReservoir.setSink(new Sink<Object>() { // from class: com.datatorrent.stram.engine.WindowGeneratorTest.2
            boolean firsttime = true;
            static final /* synthetic */ boolean $assertionsDisabled;

            public int getCount(boolean z) {
                return 0;
            }

            public void put(Object obj) {
                if (!$assertionsDisabled) {
                    throw new AssertionError();
                }
                if (!this.firsttime) {
                    if (!$assertionsDisabled && !(obj instanceof Tuple)) {
                        throw new AssertionError();
                    }
                } else {
                    if (!$assertionsDisabled && !(obj instanceof ResetWindowTuple)) {
                        throw new AssertionError();
                    }
                    this.firsttime = false;
                }
            }

            static {
                $assertionsDisabled = !WindowGeneratorTest.class.desiredAssertionStatus();
            }
        });
        windowGenerator.activate((StreamContext) null);
        manualScheduledExecutorService.tick(1L);
        Assert.assertNull(acquireReservoir.sweep());
        ResetWindowTuple sweep = acquireReservoir.sweep();
        acquireReservoir.remove();
        if (!$assertionsDisabled && sweep.getWindowId() != 792275909670338560L) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && sweep.getBaseSeconds() * 1000 != currentTimeMillis) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && sweep.getIntervalMillis() != 305441741) {
            throw new AssertionError();
        }
        Tuple sweep2 = acquireReservoir.sweep();
        acquireReservoir.remove();
        if (!$assertionsDisabled && sweep2.getType() != MessageType.BEGIN_WINDOW) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && sweep2.getWindowId() != 792275909670338560L) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && acquireReservoir.sweep() != null) {
            throw new AssertionError();
        }
    }

    @Test
    public void testWindowGen() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        AtomicLong atomicLong2 = new AtomicLong();
        Sink<Object> sink = new Sink<Object>() { // from class: com.datatorrent.stram.engine.WindowGeneratorTest.3
            public int getCount(boolean z) {
                return 0;
            }

            public void put(Object obj) {
                WindowGeneratorTest.logger.debug("unexpected payload {}", obj);
            }
        };
        long currentTimeMillis = new ScheduledThreadPoolExecutor(1, "WindowGenerator").getCurrentTimeMillis();
        long j = currentTimeMillis - (currentTimeMillis % 1000);
        WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 17023);
        windowGenerator.setResetWindow(j);
        windowGenerator.setFirstWindow(j);
        windowGenerator.setWindowWidth(200);
        SweepableReservoir acquireReservoir = windowGenerator.acquireReservoir("GeneratorTester", 200);
        acquireReservoir.setSink(sink);
        windowGenerator.activate((StreamContext) null);
        Thread.sleep(200L);
        windowGenerator.deactivate();
        acquireReservoir.sweep();
        while (true) {
            Tuple sweep = acquireReservoir.sweep();
            if (sweep == null) {
                long currentTimeMillis2 = System.currentTimeMillis();
                Assert.assertEquals("only last window open", atomicLong.get(), atomicLong2.get());
                Assert.assertTrue("Minimum begin window count", ((currentTimeMillis2 - j) / ((long) 200)) + 1 <= ((long) atomicInteger.get()));
                Assert.assertEquals("end window count", atomicInteger.get() - 1, atomicInteger2.get());
                return;
            }
            acquireReservoir.remove();
            long windowId = sweep.getWindowId();
            switch (AnonymousClass4.$SwitchMap$com$datatorrent$bufferserver$packet$MessageType[sweep.getType().ordinal()]) {
                case 1:
                    atomicLong.set(windowId);
                    atomicInteger.incrementAndGet();
                    atomicLong2.set(atomicLong2.get() ^ windowId);
                    break;
                case 2:
                    atomicInteger2.incrementAndGet();
                    atomicLong2.set(atomicLong2.get() ^ windowId);
                    break;
                case 3:
                    break;
                default:
                    atomicLong.set(0L);
                    break;
            }
        }
    }

    @Test
    public void testOutofSequenceError() throws Exception {
        logger.info("Testing Out of Sequence Error");
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new File("target/testOutofSequenceError").getAbsolutePath(), (Configuration) null));
        logicalPlan.addStream("stream", logicalPlan.addOperator("random", new RandomNumberGenerator()).output, logicalPlan.addOperator("logger", new MyLogger()).input);
        new StramLocalCluster(logicalPlan).run(10000L);
    }

    @Test
    public void testWindowToTime() {
        for (int i : new int[]{500, 123}) {
            long windowMillis = WindowGenerator.getWindowMillis(6149164867354886271L, 1431714014000L, i);
            long windowMillis2 = WindowGenerator.getWindowMillis(6149164867354886272L, 1431714014000L, i);
            long windowId = WindowGenerator.getWindowId(windowMillis, 1431714014000L, i);
            long windowId2 = WindowGenerator.getWindowId(windowMillis2, 1431714014000L, i);
            Assert.assertEquals("window 1", 6149164867354886271L, windowId);
            Assert.assertEquals("window 2", 6149164867354886272L, windowId2);
            Assert.assertEquals("window millis difference", i, windowMillis2 - windowMillis);
        }
    }

    @Test
    public void testWindowToTimeBaseSecondRollover() {
        for (int i : new int[]{500, 123}) {
            long windowId = WindowGenerator.getWindowId(1431714014123L, 1431714014123L, i) | 15999;
            long nextWindowId = WindowGenerator.getNextWindowId(windowId, 1431714014123L, i);
            Assert.assertTrue("base seconds should be greater during an rollover", (nextWindowId >> 32) > (windowId >> 32));
            long windowMillis = WindowGenerator.getWindowMillis(windowId, 1431714014123L, i);
            long windowMillis2 = WindowGenerator.getWindowMillis(nextWindowId, 1431714014123L, i);
            Assert.assertEquals("max window id", 15999L, windowId & 16383);
            Assert.assertEquals("rollover after max", 0L, nextWindowId & 16383);
            Assert.assertEquals("window millis difference", i, windowMillis2 - windowMillis);
        }
    }

    @Test
    public void testWindowIdAhead() {
        for (int i : new int[]{500, 123}) {
            long windowId = WindowGenerator.getWindowId(1431714014123L, 1431714014123L, i);
            long aheadWindowId = WindowGenerator.getAheadWindowId(windowId, 1431714014123L, i, 678);
            for (int i2 = 0; i2 < 678; i2++) {
                windowId = WindowGenerator.getNextWindowId(windowId, 1431714014123L, i);
            }
            Assert.assertEquals(aheadWindowId, windowId);
        }
    }

    @Test
    public void testWindowIdCompare() {
        for (int i : new int[]{500, 123}) {
            long windowId = WindowGenerator.getWindowId(1431714014123L, 1431714014123L, i);
            Assert.assertEquals(341, WindowGenerator.compareWindowId(WindowGenerator.getAheadWindowId(windowId, 1431714014123L, i, 341), windowId, 1431714014123L, i));
        }
    }

    static {
        $assertionsDisabled = !WindowGeneratorTest.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(WindowGeneratorTest.class);
    }
}
