/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={AbstractStreamOperator.class})
public class StreamMapTest {
    @Test
    public void testMap() throws Exception {
        StreamMap operator = new StreamMap((MapFunction)new Map());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)1, initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)2, initialTime + 2L));
        testHarness.processWatermark(new Watermark(initialTime + 2L));
        testHarness.processElement(new StreamRecord((Object)3, initialTime + 3L));
        expectedOutput.add(new StreamRecord((Object)"+2", initialTime + 1L));
        expectedOutput.add(new StreamRecord((Object)"+3", initialTime + 2L));
        expectedOutput.add(new Watermark(initialTime + 2L));
        expectedOutput.add(new StreamRecord((Object)"+4", initialTime + 3L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testOpenClose() throws Exception {
        StreamMap operator = new StreamMap((MapFunction)new TestOpenCloseMapFunction());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        long initialTime = 0L;
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)"Hello", initialTime));
        testHarness.close();
        Assert.assertTrue((String)"RichFunction methods where not called.", (boolean)TestOpenCloseMapFunction.closeCalled);
        Assert.assertTrue((String)"Output contains no elements.", (testHarness.getOutput().size() > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testFailingSnapshot() throws Exception {
        long checkpointId = 1L;
        long timestamp = 42L;
        StreamTaskState streamTaskState = (StreamTaskState)Mockito.mock(StreamTaskState.class);
        PowerMockito.whenNew(StreamTaskState.class).withAnyArguments().thenReturn((Object)streamTaskState);
        StreamMap operator = new StreamMap((MapFunction)new TestCheckpointedMapFunction());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        testHarness.open();
        try {
            testHarness.snapshot(1L, 42L);
            Assert.fail((String)"Expected exception here.");
        }
        catch (Exception exception) {
            // empty catch block
        }
        ((StreamTaskState)Mockito.verify((Object)streamTaskState)).discardState();
    }

    private static class TestOpenCloseMapFunction
    extends RichMapFunction<String, String> {
        private static final long serialVersionUID = 1L;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        private TestOpenCloseMapFunction() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            if (closeCalled) {
                Assert.fail((String)"Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail((String)"Open was not called before close.");
            }
            closeCalled = true;
        }

        public String map(String value) throws Exception {
            if (!openCalled) {
                Assert.fail((String)"Open was not called before run.");
            }
            return value;
        }
    }

    private static class TestCheckpointedMapFunction
    implements MapFunction<String, String>,
    Checkpointed<String> {
        private static final long serialVersionUID = 2353250741656753525L;

        private TestCheckpointedMapFunction() {
        }

        public String map(String value) throws Exception {
            return value;
        }

        public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            throw new IOException("Test exception.");
        }

        public void restoreState(String state) throws Exception {
        }
    }

    private static class Map
    implements MapFunction<Integer, String> {
        private static final long serialVersionUID = 1L;

        private Map() {
        }

        public String map(Integer value) throws Exception {
            return "+" + (value + 1);
        }
    }
}

