package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.lang.reflect.Field;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AsynchronousStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({ResultPartitionWriter.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.class */
public class StreamTaskAsyncCheckpointTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest$AsyncCheckpointOperator.class */
    public static class AsyncCheckpointOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        public void processElement(StreamRecord<String> streamRecord) throws Exception {
        }

        public void processWatermark(Watermark watermark) throws Exception {
        }

        public StreamTaskState snapshotOperatorState(long j, long j2) throws Exception {
            StreamTaskState snapshotOperatorState = super.snapshotOperatorState(j, j2);
            snapshotOperatorState.setOperatorState(new DataInputViewAsynchronousStateHandle(j, j2));
            return snapshotOperatorState;
        }

        public void restoreState(StreamTaskState streamTaskState) throws Exception {
            super.restoreState(streamTaskState);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest$DataInputViewAsynchronousStateHandle.class */
    private static class DataInputViewAsynchronousStateHandle extends AsynchronousStateHandle<String> {
        private final long checkpointId;
        private final long timestamp;

        public DataInputViewAsynchronousStateHandle(long j, long j2) {
            this.checkpointId = j;
            this.timestamp = j2;
        }

        public StateHandle<String> materialize() throws Exception {
            return new TestStateHandle(this.checkpointId, this.timestamp);
        }

        public void discardState() throws Exception {
        }

        public long getStateSize() {
            return 0L;
        }

        public void close() throws IOException {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest$DummyMapFunction.class */
    public static class DummyMapFunction<T> implements MapFunction<T, T> {
        public T map(T t) {
            return t;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest$TestStateHandle.class */
    private static class TestStateHandle implements StateHandle<String> {
        public final long checkpointId;
        public final long timestamp;

        public TestStateHandle(long j, long j2) {
            this.checkpointId = j;
            this.timestamp = j2;
        }

        /* renamed from: getState, reason: merged with bridge method [inline-methods] */
        public String m40getState(ClassLoader classLoader) throws Exception {
            return null;
        }

        public void discardState() throws Exception {
        }

        public long getStateSize() {
            return 0L;
        }

        public void close() throws IOException {
        }
    }

    @Test
    public void testAsyncCheckpoints() throws Exception {
        final OneShotLatch oneShotLatch = new OneShotLatch();
        final OneShotLatch oneShotLatch2 = new OneShotLatch();
        OneInputStreamTask oneInputStreamTask = new OneInputStreamTask();
        final OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(oneInputStreamTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.getStreamConfig().setStreamOperator(new AsyncCheckpointOperator());
        oneInputStreamTaskTestHarness.invoke(new StreamMockEnvironment(oneInputStreamTaskTestHarness.jobConfig, oneInputStreamTaskTestHarness.taskConfig, oneInputStreamTaskTestHarness.memorySize, new MockInputSplitProvider(), oneInputStreamTaskTestHarness.bufferSize) { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskAsyncCheckpointTest.1
            @Override // org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment
            public ExecutionConfig getExecutionConfig() {
                return oneInputStreamTaskTestHarness.executionConfig;
            }

            @Override // org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment
            public void acknowledgeCheckpoint(long j) {
                super.acknowledgeCheckpoint(j);
            }

            @Override // org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment
            public void acknowledgeCheckpoint(long j, StateHandle<?> stateHandle) {
                super.acknowledgeCheckpoint(j, stateHandle);
                try {
                    oneShotLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Assert.assertTrue(stateHandle instanceof StreamTaskStateList);
                StateHandle operatorState = ((StreamTaskStateList) stateHandle).getState(getUserClassLoader())[0].getOperatorState();
                Assert.assertTrue("It must be a TestStateHandle", operatorState instanceof TestStateHandle);
                TestStateHandle testStateHandle = (TestStateHandle) operatorState;
                Assert.assertEquals(42L, testStateHandle.checkpointId);
                Assert.assertEquals(17L, testStateHandle.timestamp);
                oneShotLatch2.trigger();
            }
        });
        for (Field field : StreamTask.class.getDeclaredFields()) {
            if (field.getName().equals("isRunning")) {
                field.setAccessible(true);
                while (!field.getBoolean(oneInputStreamTask)) {
                    Thread.sleep(10L);
                }
            }
        }
        oneInputStreamTask.triggerCheckpoint(42L, 17L);
        oneShotLatch.trigger();
        oneShotLatch2.await();
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
    }
}
