/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.datastream;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class ReinterpretDataStreamAsKeyedStreamITCase {
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testReinterpretAsKeyedStream() throws Exception {
        int maxParallelism = 8;
        int numEventsPerInstance = 100;
        int parallelism = 3;
        int numTotalEvents = 300;
        int numUniqueKeys = 100;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.setMaxParallelism(8);
        env.setParallelism(3);
        env.enableCheckpointing(100L);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        ArrayList<File> partitionFiles = new ArrayList<File>(3);
        for (int i = 0; i < 3; ++i) {
            File partitionFile = this.temporaryFolder.newFile();
            partitionFiles.add(i, partitionFile);
        }
        env.addSource((SourceFunction)new RandomTupleSource(100, 100)).keyBy(new int[]{0}).addSink((SinkFunction)new ToPartitionFileSink(partitionFiles));
        env.execute();
        DataStreamUtils.reinterpretAsKeyedStream((DataStream)env.addSource((SourceFunction)new FromPartitionFileSource(partitionFiles)), (KeySelector & Serializable)value -> (Integer)value.f0, (TypeInformation)TypeInformation.of(Integer.class)).timeWindow(Time.seconds((long)1L)).reduce((ReduceFunction & Serializable)(value1, value2) -> new Tuple2(value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1))).addSink((SinkFunction)new ValidatingSink(300)).setParallelism(1);
        env.execute();
    }

    private static class ValidatingSink
    extends RichSinkFunction<Tuple2<Integer, Integer>>
    implements CheckpointedFunction {
        private static final long serialVersionUID = 1L;
        private final int expectedSum;
        private int runningSum = 0;
        private transient ListState<Integer> sumState;

        private ValidatingSink(int expectedSum) {
            this.expectedSum = expectedSum;
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            Preconditions.checkState((this.getRuntimeContext().getNumberOfParallelSubtasks() == 1 ? 1 : 0) != 0);
        }

        public void invoke(Tuple2<Integer, Integer> value, SinkFunction.Context context) throws Exception {
            this.runningSum += ((Integer)value.f1).intValue();
        }

        public void close() throws Exception {
            Assert.assertEquals((long)this.expectedSum, (long)this.runningSum);
            super.close();
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.sumState.clear();
            this.sumState.add((Object)this.runningSum);
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.sumState = context.getOperatorStateStore().getListState(new ListStateDescriptor("sumState", Integer.class));
            if (context.isRestored()) {
                Iterator iterator = ((Iterable)this.sumState.get()).iterator();
                while (iterator.hasNext()) {
                    int value = (Integer)iterator.next();
                    this.runningSum += value;
                }
            }
        }
    }

    private static class FromPartitionFileSource
    extends RichParallelSourceFunction<Tuple2<Integer, Integer>>
    implements CheckpointedFunction,
    CheckpointListener {
        private static final long serialVersionUID = 1L;
        private final List<File> allPartitions;
        private DataInputStream din;
        private volatile boolean running;
        private long fileLength;
        private long waitForFailurePos;
        private long position;
        private transient ListState<Long> positionState;
        private transient boolean isRestored;
        private volatile transient boolean canFail;

        FromPartitionFileSource(List<File> allPartitions) {
            this.allPartitions = allPartitions;
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            int subtaskIdx = this.getRuntimeContext().getIndexOfThisSubtask();
            File partitionFile = this.allPartitions.get(subtaskIdx);
            this.fileLength = partitionFile.length();
            this.waitForFailurePos = this.fileLength * 3L / 4L;
            this.din = new DataInputStream(new BufferedInputStream(new FileInputStream(partitionFile)));
            for (long toSkip = this.position; toSkip > 0L; toSkip -= this.din.skip(toSkip)) {
            }
        }

        public void close() throws Exception {
            super.close();
            this.din.close();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> out) throws Exception {
            this.running = true;
            while (this.running && this.hasMoreDataToRead()) {
                Object object = out.getCheckpointLock();
                synchronized (object) {
                    Integer key = this.din.readInt();
                    Integer val = this.din.readInt();
                    out.collect((Object)new Tuple2((Object)key, (Object)val));
                    this.position += 8L;
                }
                if (!this.shouldWaitForCompletedCheckpointAndFailNow()) continue;
                while (!this.canFail) {
                    Thread.sleep(10L);
                }
                throw new Exception("Artificial failure.");
            }
        }

        private boolean shouldWaitForCompletedCheckpointAndFailNow() {
            return !this.isRestored && this.position > this.waitForFailurePos;
        }

        private boolean hasMoreDataToRead() {
            return this.position < this.fileLength;
        }

        public void cancel() {
            this.running = false;
        }

        public void notifyCheckpointComplete(long checkpointId) {
            this.canFail = !this.isRestored;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.positionState.clear();
            this.positionState.add((Object)this.position);
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.canFail = false;
            this.position = 0L;
            this.isRestored = context.isRestored();
            this.positionState = context.getOperatorStateStore().getListState(new ListStateDescriptor("posState", Long.class));
            if (this.isRestored) {
                Iterator iterator = ((Iterable)this.positionState.get()).iterator();
                while (iterator.hasNext()) {
                    long value = (Long)iterator.next();
                    this.position += value;
                }
            }
        }
    }

    private static class ToPartitionFileSink
    extends RichSinkFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private final List<File> allPartitions;
        private DataOutputStream dos;

        ToPartitionFileSink(List<File> allPartitions) {
            this.allPartitions = allPartitions;
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            int subtaskIdx = this.getRuntimeContext().getIndexOfThisSubtask();
            this.dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(this.allPartitions.get(subtaskIdx))));
        }

        public void close() throws Exception {
            super.close();
            this.dos.close();
        }

        public void invoke(Tuple2<Integer, Integer> value, SinkFunction.Context context) throws Exception {
            this.dos.writeInt((Integer)value.f0);
            this.dos.writeInt((Integer)value.f1);
        }
    }

    private static class RandomTupleSource
    implements ParallelSourceFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private final int numKeys;
        private int remainingEvents;

        RandomTupleSource(int numEvents, int numKeys) {
            this.numKeys = numKeys;
            this.remainingEvents = numEvents;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> out) throws Exception {
            Random random = new Random(42L);
            while (this.remainingEvents > 0) {
                Object object = out.getCheckpointLock();
                synchronized (object) {
                    out.collect((Object)new Tuple2((Object)random.nextInt(this.numKeys), (Object)1));
                    --this.remainingEvents;
                }
            }
        }

        public void cancel() {
            this.remainingEvents = 0;
        }
    }
}

