/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.lang.reflect.Field;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AsynchronousStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.runtime.tasks.StreamTaskStateList;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={ResultPartitionWriter.class})
@PowerMockIgnore(value={"javax.management.*", "com.sun.jndi.*"})
public class StreamTaskAsyncCheckpointTest {
    @Test
    public void testAsyncCheckpoints() throws Exception {
        final OneShotLatch delayCheckpointLatch = new OneShotLatch();
        final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
        OneInputStreamTask task = new OneInputStreamTask();
        final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setStreamOperator((StreamOperator)new AsyncCheckpointOperator());
        StreamMockEnvironment mockEnv = new StreamMockEnvironment(testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize){

            @Override
            public ExecutionConfig getExecutionConfig() {
                return testHarness.executionConfig;
            }

            @Override
            public void acknowledgeCheckpoint(long checkpointId) {
                super.acknowledgeCheckpoint(checkpointId);
            }

            @Override
            public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
                super.acknowledgeCheckpoint(checkpointId, state);
                try {
                    delayCheckpointLatch.await();
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Assert.assertTrue((boolean)(state instanceof StreamTaskStateList));
                StreamTaskStateList stateList = (StreamTaskStateList)state;
                StreamTaskState taskState = stateList.getState(this.getUserClassLoader())[0];
                StateHandle operatorState = taskState.getOperatorState();
                Assert.assertTrue((String)"It must be a TestStateHandle", (boolean)(operatorState instanceof TestStateHandle));
                TestStateHandle testState = (TestStateHandle)operatorState;
                Assert.assertEquals((long)42L, (long)testState.checkpointId);
                Assert.assertEquals((long)17L, (long)testState.timestamp);
                ensureCheckpointLatch.trigger();
            }
        };
        testHarness.invoke(mockEnv);
        for (Field field : StreamTask.class.getDeclaredFields()) {
            if (!field.getName().equals("isRunning")) continue;
            field.setAccessible(true);
            while (!field.getBoolean(task)) {
                Thread.sleep(10L);
            }
        }
        task.triggerCheckpoint(42L, 17L);
        delayCheckpointLatch.trigger();
        ensureCheckpointLatch.await();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    public static class DummyMapFunction<T>
    implements MapFunction<T, T> {
        public T map(T value) {
            return value;
        }
    }

    private static class TestStateHandle
    implements StateHandle<String> {
        public final long checkpointId;
        public final long timestamp;

        public TestStateHandle(long checkpointId, long timestamp) {
            this.checkpointId = checkpointId;
            this.timestamp = timestamp;
        }

        public String getState(ClassLoader userCodeClassLoader) throws Exception {
            return null;
        }

        public void discardState() throws Exception {
        }

        public long getStateSize() {
            return 0L;
        }

        public void close() throws IOException {
        }
    }

    private static class DataInputViewAsynchronousStateHandle
    extends AsynchronousStateHandle<String> {
        private final long checkpointId;
        private final long timestamp;

        public DataInputViewAsynchronousStateHandle(long checkpointId, long timestamp) {
            this.checkpointId = checkpointId;
            this.timestamp = timestamp;
        }

        public StateHandle<String> materialize() throws Exception {
            return new TestStateHandle(this.checkpointId, this.timestamp);
        }

        public long getStateSize() {
            return 0L;
        }

        public void close() throws IOException {
        }
    }

    public static class AsyncCheckpointOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        public void processElement(StreamRecord<String> element) throws Exception {
        }

        public void processWatermark(Watermark mark) throws Exception {
        }

        public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
            StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
            DataInputViewAsynchronousStateHandle asyncState = new DataInputViewAsynchronousStateHandle(checkpointId, timestamp);
            taskState.setOperatorState((StateHandle)asyncState);
            return taskState;
        }

        public void restoreState(StreamTaskState taskState) throws Exception {
            super.restoreState(taskState);
        }
    }
}

