/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.junit.Assert;
import org.junit.Test;

public class StatefulSequenceSourceTest {
    @Test
    public void testCheckpointRestore() throws Exception {
        boolean initElement = false;
        int maxElement = 100;
        int maxParallelsim = 2;
        HashSet<Long> expectedOutput = new HashSet<Long>();
        for (long i = 0L; i <= 100L; ++i) {
            expectedOutput.add(i);
        }
        final ConcurrentHashMap outputCollector = new ConcurrentHashMap();
        final OneShotLatch latchToTrigger1 = new OneShotLatch();
        final OneShotLatch latchToWait1 = new OneShotLatch();
        final OneShotLatch latchToTrigger2 = new OneShotLatch();
        final OneShotLatch latchToWait2 = new OneShotLatch();
        final StatefulSequenceSource source1 = new StatefulSequenceSource(0L, 100L);
        StreamSource src1 = new StreamSource((SourceFunction)source1);
        AbstractStreamOperatorTestHarness testHarness1 = new AbstractStreamOperatorTestHarness(src1, 2, 2, 0);
        testHarness1.open();
        final StatefulSequenceSource source2 = new StatefulSequenceSource(0L, 100L);
        StreamSource src2 = new StreamSource((SourceFunction)source2);
        AbstractStreamOperatorTestHarness testHarness2 = new AbstractStreamOperatorTestHarness(src2, 2, 2, 1);
        testHarness2.open();
        final Throwable[] error = new Throwable[3];
        Thread runner1 = new Thread(){

            @Override
            public void run() {
                try {
                    source1.run((SourceFunction.SourceContext)new BlockingSourceContext("1", latchToTrigger1, latchToWait1, outputCollector, 21));
                }
                catch (Throwable t) {
                    t.printStackTrace();
                    error[0] = t;
                }
            }
        };
        Thread runner2 = new Thread(){

            @Override
            public void run() {
                try {
                    source2.run((SourceFunction.SourceContext)new BlockingSourceContext("2", latchToTrigger2, latchToWait2, outputCollector, 32));
                }
                catch (Throwable t) {
                    t.printStackTrace();
                    error[1] = t;
                }
            }
        };
        runner1.start();
        runner2.start();
        if (!latchToTrigger1.isTriggered()) {
            latchToTrigger1.await();
        }
        if (!latchToTrigger2.isTriggered()) {
            latchToTrigger2.await();
        }
        OperatorSubtaskState snapshot = AbstractStreamOperatorTestHarness.repackageState(testHarness1.snapshot(0L, 0L), testHarness2.snapshot(0L, 0L));
        final StatefulSequenceSource source3 = new StatefulSequenceSource(0L, 100L);
        StreamSource src3 = new StreamSource((SourceFunction)source3);
        OperatorSubtaskState initState = AbstractStreamOperatorTestHarness.repartitionOperatorState(snapshot, 2, 2, 1, 0);
        AbstractStreamOperatorTestHarness testHarness3 = new AbstractStreamOperatorTestHarness(src3, 2, 1, 0);
        testHarness3.setup();
        testHarness3.initializeState(initState);
        testHarness3.open();
        final OneShotLatch latchToTrigger3 = new OneShotLatch();
        final OneShotLatch latchToWait3 = new OneShotLatch();
        latchToWait3.trigger();
        Thread runner3 = new Thread(){

            @Override
            public void run() {
                try {
                    source3.run((SourceFunction.SourceContext)new BlockingSourceContext("3", latchToTrigger3, latchToWait3, outputCollector, 3));
                }
                catch (Throwable t) {
                    t.printStackTrace();
                    error[2] = t;
                }
            }
        };
        runner3.start();
        runner3.join();
        Assert.assertEquals((long)3L, (long)outputCollector.size());
        HashSet<Long> dedupRes = new HashSet<Long>(Math.abs(100) + 1);
        for (Map.Entry elementsPerTask : outputCollector.entrySet()) {
            String key = (String)elementsPerTask.getKey();
            List elements = (List)outputCollector.get(key);
            Assert.assertTrue((elements.size() > 0 ? 1 : 0) != 0);
            for (Long elem : elements) {
                if (!dedupRes.add(elem)) {
                    Assert.fail((String)("Duplicate entry: " + elem));
                }
                if (expectedOutput.contains(elem)) continue;
                Assert.fail((String)("Unexpected element: " + elem));
            }
        }
        Assert.assertEquals((long)(Math.abs(-100) + 1), (long)dedupRes.size());
        latchToWait1.trigger();
        latchToWait2.trigger();
        runner1.join();
        runner2.join();
    }

    private static class BlockingSourceContext
    implements SourceFunction.SourceContext<Long> {
        private final String name;
        private final Object lock;
        private final OneShotLatch latchToTrigger;
        private final OneShotLatch latchToWait;
        private final ConcurrentHashMap<String, List<Long>> collector;
        private final int threshold;
        private int counter = 0;
        private final List<Long> localOutput;

        public BlockingSourceContext(String name, OneShotLatch latchToTrigger, OneShotLatch latchToWait, ConcurrentHashMap<String, List<Long>> output, int elemToFire) {
            this.name = name;
            this.lock = new Object();
            this.latchToTrigger = latchToTrigger;
            this.latchToWait = latchToWait;
            this.collector = output;
            this.threshold = elemToFire;
            this.localOutput = new ArrayList<Long>();
            List<Long> prev = this.collector.put(name, this.localOutput);
            if (prev != null) {
                Assert.fail();
            }
        }

        public void collectWithTimestamp(Long element, long timestamp) {
            this.collect(element);
        }

        public void collect(Long element) {
            this.localOutput.add(element);
            if (++this.counter == this.threshold) {
                this.latchToTrigger.trigger();
                try {
                    if (!this.latchToWait.isTriggered()) {
                        this.latchToWait.await();
                    }
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        public void emitWatermark(Watermark mark) {
            throw new UnsupportedOperationException();
        }

        public void markAsTemporarilyIdle() {
            throw new UnsupportedOperationException();
        }

        public Object getCheckpointLock() {
            return this.lock;
        }

        public void close() {
        }
    }
}

