/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={ResultPartitionWriter.class})
@PowerMockIgnore(value={"javax.management.*"})
public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> {
    protected OperatorExposingTask<IN> createTask() {
        return new OperatorExposingTask();
    }

    protected abstract S createSink() throws Exception;

    protected abstract TypeInformation<IN> createTypeInfo();

    protected abstract IN generateValue(int var1, int var2);

    protected abstract void verifyResultsIdealCircumstances(OneInputStreamTaskTestHarness<IN, IN> var1, OneInputStreamTask<IN, IN> var2, S var3) throws Exception;

    protected abstract void verifyResultsDataPersistenceUponMissedNotify(OneInputStreamTaskTestHarness<IN, IN> var1, OneInputStreamTask<IN, IN> var2, S var3) throws Exception;

    protected abstract void verifyResultsDataDiscardingUponRestore(OneInputStreamTaskTestHarness<IN, IN> var1, OneInputStreamTask<IN, IN> var2, S var3) throws Exception;

    @Test
    public void testIdealCircumstances() throws Exception {
        int x;
        OperatorExposingTask<IN> task = this.createTask();
        TypeInformation<IN> info = this.createTypeInfo();
        OneInputStreamTaskTestHarness<IN, IN> testHarness = new OneInputStreamTaskTestHarness<IN, IN>(task, 1, 1, info, info);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setCheckpointingEnabled(true);
        streamConfig.setStreamOperator(this.createSink());
        int elementCounter = 1;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        ArrayList<StreamTaskState> states = new ArrayList<StreamTaskState>();
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 0)));
            ++elementCounter;
        }
        testHarness.waitForInputProcessing();
        states.add(this.copyTaskState(task.getOperator().snapshotOperatorState((long)states.size(), 0L)));
        task.notifyCheckpointComplete(states.size() - 1);
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 1)));
            ++elementCounter;
        }
        testHarness.waitForInputProcessing();
        states.add(this.copyTaskState(task.getOperator().snapshotOperatorState((long)states.size(), 0L)));
        task.notifyCheckpointComplete(states.size() - 1);
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 2)));
            ++elementCounter;
        }
        testHarness.waitForInputProcessing();
        states.add(this.copyTaskState(task.getOperator().snapshotOperatorState((long)states.size(), 0L)));
        task.notifyCheckpointComplete(states.size() - 1);
        testHarness.endInput();
        states.add(this.copyTaskState(task.getOperator().snapshotOperatorState((long)states.size(), 0L)));
        testHarness.waitForTaskCompletion();
        this.verifyResultsIdealCircumstances(testHarness, task, (GenericWriteAheadSink)task.getOperator());
    }

    @Test
    public void testDataPersistenceUponMissedNotify() throws Exception {
        int x;
        S sink = this.createSink();
        OperatorExposingTask<IN> task = this.createTask();
        TypeInformation<IN> info = this.createTypeInfo();
        OneInputStreamTaskTestHarness<IN, IN> testHarness = new OneInputStreamTaskTestHarness<IN, IN>(task, 1, 1, info, info);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setCheckpointingEnabled(true);
        streamConfig.setStreamOperator(sink);
        int elementCounter = 1;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        ArrayList<StreamTaskState> states = new ArrayList<StreamTaskState>();
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 0)));
            ++elementCounter;
        }
        testHarness.waitForInputProcessing();
        states.add(this.copyTaskState(task.getOperator().snapshotOperatorState((long)states.size(), 0L)));
        task.notifyCheckpointComplete(states.size() - 1);
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 1)));
            ++elementCounter;
        }
        testHarness.waitForInputProcessing();
        states.add(this.copyTaskState(task.getOperator().snapshotOperatorState((long)states.size(), 0L)));
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 2)));
            ++elementCounter;
        }
        testHarness.waitForInputProcessing();
        states.add(this.copyTaskState(task.getOperator().snapshotOperatorState((long)states.size(), 0L)));
        task.notifyCheckpointComplete(states.size() - 1);
        testHarness.endInput();
        states.add(this.copyTaskState(task.getOperator().snapshotOperatorState((long)states.size(), 0L)));
        testHarness.waitForTaskCompletion();
        this.verifyResultsDataPersistenceUponMissedNotify(testHarness, task, (GenericWriteAheadSink)task.getOperator());
    }

    @Test
    public void testDataDiscardingUponRestore() throws Exception {
        int x;
        S sink = this.createSink();
        OperatorExposingTask<IN> task = this.createTask();
        TypeInformation<IN> info = this.createTypeInfo();
        OneInputStreamTaskTestHarness<IN, IN> testHarness = new OneInputStreamTaskTestHarness<IN, IN>(task, 1, 1, info, info);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setCheckpointingEnabled(true);
        streamConfig.setStreamOperator(sink);
        int elementCounter = 1;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        ArrayList<StreamTaskState> states = new ArrayList<StreamTaskState>();
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 0)));
            ++elementCounter;
        }
        testHarness.waitForInputProcessing();
        states.add(this.copyTaskState(task.getOperator().snapshotOperatorState((long)states.size(), 0L)));
        task.notifyCheckpointComplete(states.size() - 1);
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 1)));
            ++elementCounter;
        }
        testHarness.waitForInputProcessing();
        task.getOperator().close();
        task.getOperator().open();
        task.getOperator().restoreState((StreamTaskState)states.get(states.size() - 1));
        for (x = 0; x < 20; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 2)));
            ++elementCounter;
        }
        testHarness.waitForInputProcessing();
        states.add(this.copyTaskState(task.getOperator().snapshotOperatorState((long)states.size(), 0L)));
        task.notifyCheckpointComplete(states.size() - 1);
        testHarness.endInput();
        states.add(this.copyTaskState(task.getOperator().snapshotOperatorState((long)states.size(), 0L)));
        testHarness.waitForTaskCompletion();
        this.verifyResultsDataDiscardingUponRestore(testHarness, task, (GenericWriteAheadSink)task.getOperator());
    }

    protected StreamTaskState copyTaskState(StreamTaskState toCopy) throws IOException, ClassNotFoundException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(baos);
        oos.writeObject(toCopy);
        ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
        ObjectInputStream ois = new ObjectInputStream(bais);
        return (StreamTaskState)ois.readObject();
    }

    protected class OperatorExposingTask<INT>
    extends OneInputStreamTask<INT, INT> {
        protected OperatorExposingTask() {
        }

        public OneInputStreamOperator<INT, INT> getOperator() {
            return (OneInputStreamOperator)this.headOperator;
        }
    }
}

