/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorNodePunctuator;
import org.apache.kafka.streams.processor.internals.PunctuationQueue;
import org.apache.kafka.streams.processor.internals.PunctuationSchedule;
import org.junit.Assert;
import org.junit.Test;

public class PunctuationQueueTest {
    @Test
    public void testPunctuationInterval() {
        TestProcessor processor = new TestProcessor();
        final ProcessorNode node = new ProcessorNode("test", (Processor)processor, null);
        PunctuationQueue queue = new PunctuationQueue();
        Punctuator punctuator = new Punctuator(){

            public void punctuate(long timestamp) {
                node.processor().punctuate(timestamp);
            }
        };
        PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 100L, punctuator);
        long now = sched.timestamp - 100L;
        queue.schedule(sched);
        ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator(){

            public void punctuate(ProcessorNode node, long time, PunctuationType type, Punctuator punctuator) {
                punctuator.punctuate(time);
            }
        };
        queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)0L, (long)processor.punctuatedAt.size());
        queue.mayPunctuate(now + 99L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)0L, (long)processor.punctuatedAt.size());
        queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)1L, (long)processor.punctuatedAt.size());
        queue.mayPunctuate(now + 199L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)1L, (long)processor.punctuatedAt.size());
        queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)2L, (long)processor.punctuatedAt.size());
        queue.mayPunctuate(now + 1001L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)3L, (long)processor.punctuatedAt.size());
        queue.mayPunctuate(now + 1002L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)3L, (long)processor.punctuatedAt.size());
        queue.mayPunctuate(now + 1100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)4L, (long)processor.punctuatedAt.size());
    }

    @Test
    public void testPunctuationIntervalCustomAlignment() {
        TestProcessor processor = new TestProcessor();
        final ProcessorNode node = new ProcessorNode("test", (Processor)processor, null);
        PunctuationQueue queue = new PunctuationQueue();
        Punctuator punctuator = new Punctuator(){

            public void punctuate(long timestamp) {
                node.processor().punctuate(timestamp);
            }
        };
        PunctuationSchedule sched = new PunctuationSchedule(node, 50L, 100L, punctuator);
        long now = sched.timestamp - 50L;
        queue.schedule(sched);
        ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator(){

            public void punctuate(ProcessorNode node, long time, PunctuationType type, Punctuator punctuator) {
                punctuator.punctuate(time);
            }
        };
        queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)0L, (long)processor.punctuatedAt.size());
        queue.mayPunctuate(now + 49L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)0L, (long)processor.punctuatedAt.size());
        queue.mayPunctuate(now + 50L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)1L, (long)processor.punctuatedAt.size());
        queue.mayPunctuate(now + 149L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)1L, (long)processor.punctuatedAt.size());
        queue.mayPunctuate(now + 150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)2L, (long)processor.punctuatedAt.size());
        queue.mayPunctuate(now + 1051L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)3L, (long)processor.punctuatedAt.size());
        queue.mayPunctuate(now + 1052L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)3L, (long)processor.punctuatedAt.size());
        queue.mayPunctuate(now + 1150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
        Assert.assertEquals((long)4L, (long)processor.punctuatedAt.size());
    }

    private static class TestProcessor
    extends AbstractProcessor<String, String> {
        public final ArrayList<Long> punctuatedAt = new ArrayList();

        private TestProcessor() {
        }

        public void init(ProcessorContext context) {
        }

        public void process(String key, String value) {
        }

        public void punctuate(long streamTime) {
            this.punctuatedAt.add(streamTime);
        }

        public void close() {
        }
    }
}

