package org.apache.flink.streaming.runtime.operators;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({ResultPartitionWriter.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*"})
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.class */
public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> {

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase$OperatorExposingTask.class */
    public class OperatorExposingTask<INT> extends OneInputStreamTask<INT, INT> {
        protected OperatorExposingTask() {
        }

        public OneInputStreamOperator<INT, INT> getOperator() {
            return this.headOperator;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public WriteAheadSinkTestBase<IN, S>.OperatorExposingTask<IN> createTask() {
        return new OperatorExposingTask<>();
    }

    protected abstract S createSink() throws Exception;

    protected abstract TypeInformation<IN> createTypeInfo();

    protected abstract IN generateValue(int i, int i2);

    protected abstract void verifyResultsIdealCircumstances(OneInputStreamTaskTestHarness<IN, IN> oneInputStreamTaskTestHarness, OneInputStreamTask<IN, IN> oneInputStreamTask, S s) throws Exception;

    protected abstract void verifyResultsDataPersistenceUponMissedNotify(OneInputStreamTaskTestHarness<IN, IN> oneInputStreamTaskTestHarness, OneInputStreamTask<IN, IN> oneInputStreamTask, S s) throws Exception;

    protected abstract void verifyResultsDataDiscardingUponRestore(OneInputStreamTaskTestHarness<IN, IN> oneInputStreamTaskTestHarness, OneInputStreamTask<IN, IN> oneInputStreamTask, S s) throws Exception;

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testIdealCircumstances() throws Exception {
        OperatorExposingTask createTask = createTask();
        TypeInformation createTypeInfo = createTypeInfo();
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(createTask, 1, 1, createTypeInfo, createTypeInfo);
        StreamConfig streamConfig = oneInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setCheckpointingEnabled(true);
        streamConfig.setStreamOperator(createSink());
        int i = 1;
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 20; i2++) {
            oneInputStreamTaskTestHarness.processElement(new StreamRecord(generateValue(i, 0)));
            i++;
        }
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        arrayList.add(copyTaskState(createTask.getOperator().snapshotOperatorState(arrayList.size(), 0L)));
        createTask.notifyCheckpointComplete(arrayList.size() - 1);
        for (int i3 = 0; i3 < 20; i3++) {
            oneInputStreamTaskTestHarness.processElement(new StreamRecord(generateValue(i, 1)));
            i++;
        }
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        arrayList.add(copyTaskState(createTask.getOperator().snapshotOperatorState(arrayList.size(), 0L)));
        createTask.notifyCheckpointComplete(arrayList.size() - 1);
        for (int i4 = 0; i4 < 20; i4++) {
            oneInputStreamTaskTestHarness.processElement(new StreamRecord(generateValue(i, 2)));
            i++;
        }
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        arrayList.add(copyTaskState(createTask.getOperator().snapshotOperatorState(arrayList.size(), 0L)));
        createTask.notifyCheckpointComplete(arrayList.size() - 1);
        oneInputStreamTaskTestHarness.endInput();
        arrayList.add(copyTaskState(createTask.getOperator().snapshotOperatorState(arrayList.size(), 0L)));
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        verifyResultsIdealCircumstances(oneInputStreamTaskTestHarness, createTask, createTask.getOperator());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testDataPersistenceUponMissedNotify() throws Exception {
        GenericWriteAheadSink createSink = createSink();
        OperatorExposingTask createTask = createTask();
        TypeInformation createTypeInfo = createTypeInfo();
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(createTask, 1, 1, createTypeInfo, createTypeInfo);
        StreamConfig streamConfig = oneInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setCheckpointingEnabled(true);
        streamConfig.setStreamOperator(createSink);
        int i = 1;
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 20; i2++) {
            oneInputStreamTaskTestHarness.processElement(new StreamRecord(generateValue(i, 0)));
            i++;
        }
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        arrayList.add(copyTaskState(createTask.getOperator().snapshotOperatorState(arrayList.size(), 0L)));
        createTask.notifyCheckpointComplete(arrayList.size() - 1);
        for (int i3 = 0; i3 < 20; i3++) {
            oneInputStreamTaskTestHarness.processElement(new StreamRecord(generateValue(i, 1)));
            i++;
        }
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        arrayList.add(copyTaskState(createTask.getOperator().snapshotOperatorState(arrayList.size(), 0L)));
        for (int i4 = 0; i4 < 20; i4++) {
            oneInputStreamTaskTestHarness.processElement(new StreamRecord(generateValue(i, 2)));
            i++;
        }
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        arrayList.add(copyTaskState(createTask.getOperator().snapshotOperatorState(arrayList.size(), 0L)));
        createTask.notifyCheckpointComplete(arrayList.size() - 1);
        oneInputStreamTaskTestHarness.endInput();
        arrayList.add(copyTaskState(createTask.getOperator().snapshotOperatorState(arrayList.size(), 0L)));
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        verifyResultsDataPersistenceUponMissedNotify(oneInputStreamTaskTestHarness, createTask, createTask.getOperator());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testDataDiscardingUponRestore() throws Exception {
        GenericWriteAheadSink createSink = createSink();
        OperatorExposingTask createTask = createTask();
        TypeInformation createTypeInfo = createTypeInfo();
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(createTask, 1, 1, createTypeInfo, createTypeInfo);
        StreamConfig streamConfig = oneInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setCheckpointingEnabled(true);
        streamConfig.setStreamOperator(createSink);
        int i = 1;
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 20; i2++) {
            oneInputStreamTaskTestHarness.processElement(new StreamRecord(generateValue(i, 0)));
            i++;
        }
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        arrayList.add(copyTaskState(createTask.getOperator().snapshotOperatorState(arrayList.size(), 0L)));
        createTask.notifyCheckpointComplete(arrayList.size() - 1);
        for (int i3 = 0; i3 < 20; i3++) {
            oneInputStreamTaskTestHarness.processElement(new StreamRecord(generateValue(i, 1)));
            i++;
        }
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        createTask.getOperator().close();
        createTask.getOperator().open();
        createTask.getOperator().restoreState((StreamTaskState) arrayList.get(arrayList.size() - 1));
        for (int i4 = 0; i4 < 20; i4++) {
            oneInputStreamTaskTestHarness.processElement(new StreamRecord(generateValue(i, 2)));
            i++;
        }
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        arrayList.add(copyTaskState(createTask.getOperator().snapshotOperatorState(arrayList.size(), 0L)));
        createTask.notifyCheckpointComplete(arrayList.size() - 1);
        oneInputStreamTaskTestHarness.endInput();
        arrayList.add(copyTaskState(createTask.getOperator().snapshotOperatorState(arrayList.size(), 0L)));
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        verifyResultsDataDiscardingUponRestore(oneInputStreamTaskTestHarness, createTask, createTask.getOperator());
    }

    protected StreamTaskState copyTaskState(StreamTaskState streamTaskState) throws IOException, ClassNotFoundException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        new ObjectOutputStream(byteArrayOutputStream).writeObject(streamTaskState);
        return (StreamTaskState) new ObjectInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())).readObject();
    }
}
