/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={ResultPartitionWriter.class})
@PowerMockIgnore(value={"javax.management.*", "com.sun.jndi.*"})
public class GenericWriteAheadSinkTest
extends WriteAheadSinkTestBase<Tuple1<Integer>, ListSink> {
    @Override
    protected ListSink createSink() throws Exception {
        return new ListSink();
    }

    @Override
    protected TupleTypeInfo<Tuple1<Integer>> createTypeInfo() {
        return TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{Integer.class});
    }

    @Override
    protected Tuple1<Integer> generateValue(int counter, int checkpointID) {
        return new Tuple1((Object)counter);
    }

    @Override
    protected void verifyResultsIdealCircumstances(OneInputStreamTaskTestHarness<Tuple1<Integer>, Tuple1<Integer>> harness, OneInputStreamTask<Tuple1<Integer>, Tuple1<Integer>> task, ListSink sink) {
        ArrayList<Integer> list = new ArrayList<Integer>();
        for (int x = 1; x <= 60; ++x) {
            list.add(x);
        }
        for (Integer i : sink.values) {
            list.remove(i);
        }
        Assert.assertTrue((String)("The following ID's where not found in the result list: " + list.toString()), (boolean)list.isEmpty());
        Assert.assertTrue((String)("The sink emitted to many values: " + (sink.values.size() - 60)), (sink.values.size() == 60 ? 1 : 0) != 0);
    }

    @Override
    protected void verifyResultsDataPersistenceUponMissedNotify(OneInputStreamTaskTestHarness<Tuple1<Integer>, Tuple1<Integer>> harness, OneInputStreamTask<Tuple1<Integer>, Tuple1<Integer>> task, ListSink sink) {
        ArrayList<Integer> list = new ArrayList<Integer>();
        for (int x = 1; x <= 60; ++x) {
            list.add(x);
        }
        for (Integer i : sink.values) {
            list.remove(i);
        }
        Assert.assertTrue((String)("The following ID's where not found in the result list: " + list.toString()), (boolean)list.isEmpty());
        Assert.assertTrue((String)("The sink emitted to many values: " + (sink.values.size() - 60)), (sink.values.size() == 60 ? 1 : 0) != 0);
    }

    @Override
    protected void verifyResultsDataDiscardingUponRestore(OneInputStreamTaskTestHarness<Tuple1<Integer>, Tuple1<Integer>> harness, OneInputStreamTask<Tuple1<Integer>, Tuple1<Integer>> task, ListSink sink) {
        int x;
        ArrayList<Integer> list = new ArrayList<Integer>();
        for (x = 1; x <= 20; ++x) {
            list.add(x);
        }
        for (x = 41; x <= 60; ++x) {
            list.add(x);
        }
        for (Integer i : sink.values) {
            list.remove(i);
        }
        Assert.assertTrue((String)("The following ID's where not found in the result list: " + list.toString()), (boolean)list.isEmpty());
        Assert.assertTrue((String)("The sink emitted to many values: " + (sink.values.size() - 40)), (sink.values.size() == 40 ? 1 : 0) != 0);
    }

    @Test
    public void testCommitterException() throws Exception {
        int x;
        WriteAheadSinkTestBase.OperatorExposingTask task = this.createTask();
        TupleTypeInfo<Tuple1<Integer>> info = this.createTypeInfo();
        OneInputStreamTaskTestHarness<Tuple1<Integer>, Tuple1<Integer>> testHarness = new OneInputStreamTaskTestHarness<Tuple1<Integer>, Tuple1<Integer>>(task, 1, 1, (TypeInformation<Tuple1<Integer>>)info, (TypeInformation<Tuple1<Integer>>)info);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setCheckpointingEnabled(true);
        streamConfig.setStreamOperator((StreamOperator)new ListSink2());
        int elementCounter = 1;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        for (x = 0; x < 10; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 0)));
            ++elementCounter;
        }
        testHarness.waitForInputProcessing();
        task.getOperator().snapshotOperatorState(0L, 0L);
        task.notifyCheckpointComplete(0L);
        Assert.assertTrue((((ListSink2)task.getOperator()).values.size() == 0 ? 1 : 0) != 0);
        for (x = 0; x < 10; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 1)));
            ++elementCounter;
        }
        testHarness.waitForInputProcessing();
        task.getOperator().snapshotOperatorState(1L, 0L);
        task.notifyCheckpointComplete(1L);
        Assert.assertTrue((((ListSink2)task.getOperator()).values.size() == 10 ? 1 : 0) != 0);
        for (x = 0; x < 10; ++x) {
            testHarness.processElement(new StreamRecord(this.generateValue(elementCounter, 2)));
            ++elementCounter;
        }
        testHarness.waitForInputProcessing();
        task.getOperator().snapshotOperatorState(2L, 0L);
        task.notifyCheckpointComplete(2L);
        Assert.assertTrue((((ListSink2)task.getOperator()).values.size() == 40 ? 1 : 0) != 0);
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    public static class FailingCommitter
    extends CheckpointCommitter {
        private List<Long> checkpoints;
        private boolean failIsCommitted = true;
        private boolean failCommit = true;

        public void open() throws Exception {
        }

        public void close() throws Exception {
        }

        public void createResource() throws Exception {
            this.checkpoints = new ArrayList<Long>();
        }

        public void commitCheckpoint(long checkpointID) {
            if (this.failCommit) {
                this.failCommit = false;
                throw new RuntimeException("Expected exception");
            }
            this.checkpoints.add(checkpointID);
        }

        public boolean isCheckpointCommitted(long checkpointID) {
            if (this.failIsCommitted) {
                this.failIsCommitted = false;
                throw new RuntimeException("Expected exception");
            }
            return false;
        }
    }

    public static class ListSink2
    extends GenericWriteAheadSink<Tuple1<Integer>> {
        public List<Integer> values = new ArrayList<Integer>();

        public ListSink2() throws Exception {
            super((CheckpointCommitter)new FailingCommitter(), TypeExtractor.getForObject((Object)new Tuple1((Object)1)).createSerializer(new ExecutionConfig()), "job");
        }

        protected boolean sendValues(Iterable<Tuple1<Integer>> values, long timestamp) throws Exception {
            for (Tuple1<Integer> value : values) {
                this.values.add((Integer)value.f0);
            }
            return true;
        }
    }

    public static class SimpleCommitter
    extends CheckpointCommitter {
        private List<Long> checkpoints;

        public void open() throws Exception {
        }

        public void close() throws Exception {
        }

        public void createResource() throws Exception {
            this.checkpoints = new ArrayList<Long>();
        }

        public void commitCheckpoint(long checkpointID) {
            this.checkpoints.add(checkpointID);
        }

        public boolean isCheckpointCommitted(long checkpointID) {
            return this.checkpoints.contains(checkpointID);
        }
    }

    public static class ListSink
    extends GenericWriteAheadSink<Tuple1<Integer>> {
        public List<Integer> values = new ArrayList<Integer>();

        public ListSink() throws Exception {
            super((CheckpointCommitter)new SimpleCommitter(), TypeExtractor.getForObject((Object)new Tuple1((Object)1)).createSerializer(new ExecutionConfig()), "job");
        }

        protected boolean sendValues(Iterable<Tuple1<Integer>> values, long timestamp) throws Exception {
            for (Tuple1<Integer> value : values) {
                this.values.add((Integer)value.f0);
            }
            return true;
        }
    }
}

