package org.apache.flink.test.streaming.api.datastream;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.class */
public class ReinterpretDataStreamAsKeyedStreamITCase {

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase$FromPartitionFileSource.class */
    private static class FromPartitionFileSource extends RichParallelSourceFunction<Tuple2<Integer, Integer>> implements CheckpointedFunction, CheckpointListener {
        private static final long serialVersionUID = 1;
        private final List<File> allPartitions;
        private DataInputStream din;
        private volatile boolean running;
        private long fileLength;
        private long waitForFailurePos;
        private long position;
        private transient ListState<Long> positionState;
        private transient boolean isRestored;
        private volatile transient boolean canFail;

        FromPartitionFileSource(List<File> list) {
            this.allPartitions = list;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            File file = this.allPartitions.get(getRuntimeContext().getIndexOfThisSubtask());
            this.fileLength = file.length();
            this.waitForFailurePos = (this.fileLength * 3) / 4;
            this.din = new DataInputStream(new BufferedInputStream(new FileInputStream(file)));
            long j = this.position;
            while (true) {
                long j2 = j;
                if (j2 <= 0) {
                    return;
                } else {
                    j = j2 - this.din.skip(j2);
                }
            }
        }

        public void close() throws Exception {
            super.close();
            this.din.close();
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> sourceContext) throws Exception {
            this.running = true;
            while (this.running && hasMoreDataToRead()) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(new Tuple2(Integer.valueOf(this.din.readInt()), Integer.valueOf(this.din.readInt())));
                    this.position += 8;
                }
                if (shouldWaitForCompletedCheckpointAndFailNow()) {
                    while (!this.canFail) {
                        Thread.sleep(10L);
                    }
                    throw new Exception("Artificial failure.");
                }
            }
        }

        private boolean shouldWaitForCompletedCheckpointAndFailNow() {
            return !this.isRestored && this.position > this.waitForFailurePos;
        }

        private boolean hasMoreDataToRead() {
            return this.position < this.fileLength;
        }

        public void cancel() {
            this.running = false;
        }

        public void notifyCheckpointComplete(long j) {
            this.canFail = !this.isRestored;
        }

        public void notifyCheckpointAborted(long j) {
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.positionState.clear();
            this.positionState.add(Long.valueOf(this.position));
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.canFail = false;
            this.position = 0L;
            this.isRestored = functionInitializationContext.isRestored();
            this.positionState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("posState", Long.class));
            if (this.isRestored) {
                Iterator it = ((Iterable) this.positionState.get()).iterator();
                while (it.hasNext()) {
                    this.position += ((Long) it.next()).longValue();
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase$RandomTupleSource.class */
    private static class RandomTupleSource implements ParallelSourceFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1;
        private final int numKeys;
        private int remainingEvents;

        RandomTupleSource(int i, int i2) {
            this.numKeys = i2;
            this.remainingEvents = i;
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> sourceContext) throws Exception {
            Random random = new Random(42L);
            while (this.remainingEvents > 0) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(new Tuple2(Integer.valueOf(random.nextInt(this.numKeys)), 1));
                    this.remainingEvents--;
                }
            }
        }

        public void cancel() {
            this.remainingEvents = 0;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase$ToPartitionFileSink.class */
    private static class ToPartitionFileSink extends RichSinkFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1;
        private final List<File> allPartitions;
        private DataOutputStream dos;

        ToPartitionFileSink(List<File> list) {
            this.allPartitions = list;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(this.allPartitions.get(getRuntimeContext().getIndexOfThisSubtask()))));
        }

        public void close() throws Exception {
            super.close();
            this.dos.close();
        }

        public void invoke(Tuple2<Integer, Integer> tuple2, SinkFunction.Context context) throws Exception {
            this.dos.writeInt(((Integer) tuple2.f0).intValue());
            this.dos.writeInt(((Integer) tuple2.f1).intValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase$ValidatingSink.class */
    private static class ValidatingSink extends RichSinkFunction<Tuple2<Integer, Integer>> implements CheckpointedFunction {
        private static final long serialVersionUID = 1;
        private final int expectedSum;
        private int runningSum;
        private transient ListState<Integer> sumState;

        private ValidatingSink(int i) {
            this.runningSum = 0;
            this.expectedSum = i;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            Preconditions.checkState(getRuntimeContext().getNumberOfParallelSubtasks() == 1);
        }

        public void invoke(Tuple2<Integer, Integer> tuple2, SinkFunction.Context context) throws Exception {
            this.runningSum += ((Integer) tuple2.f1).intValue();
        }

        public void close() throws Exception {
            Assert.assertEquals(this.expectedSum, this.runningSum);
            super.close();
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.sumState.clear();
            this.sumState.add(Integer.valueOf(this.runningSum));
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.sumState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("sumState", Integer.class));
            if (functionInitializationContext.isRestored()) {
                Iterator it = ((Iterable) this.sumState.get()).iterator();
                while (it.hasNext()) {
                    this.runningSum += ((Integer) it.next()).intValue();
                }
            }
        }
    }

    @Test
    public void testReinterpretAsKeyedStream() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        executionEnvironment.setMaxParallelism(8);
        executionEnvironment.setParallelism(3);
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        ArrayList arrayList = new ArrayList(3);
        for (int i = 0; i < 3; i++) {
            arrayList.add(i, this.temporaryFolder.newFile());
        }
        executionEnvironment.addSource(new RandomTupleSource(100, 100)).keyBy(new int[]{0}).addSink(new ToPartitionFileSink(arrayList));
        executionEnvironment.execute();
        DataStreamUtils.reinterpretAsKeyedStream(executionEnvironment.addSource(new FromPartitionFileSource(arrayList)), tuple2 -> {
            return (Integer) tuple2.f0;
        }, TypeInformation.of(Integer.class)).timeWindow(Time.seconds(1L)).reduce((tuple22, tuple23) -> {
            return new Tuple2(tuple22.f0, Integer.valueOf(((Integer) tuple22.f1).intValue() + ((Integer) tuple23.f1).intValue()));
        }).addSink(new ValidatingSink(300)).setParallelism(1);
        executionEnvironment.execute();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -994115423:
                if (implMethodName.equals("lambda$testReinterpretAsKeyedStream$8248e4bf$1")) {
                    z = true;
                    break;
                }
                break;
            case 561979237:
                if (implMethodName.equals("lambda$testReinterpretAsKeyedStream$3558be8e$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple2 -> {
                        return (Integer) tuple2.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/ReduceFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("reduce") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;Lorg/apache/flink/api/java/tuple/Tuple2;)Lorg/apache/flink/api/java/tuple/Tuple2;")) {
                    return (tuple22, tuple23) -> {
                        return new Tuple2(tuple22.f0, Integer.valueOf(((Integer) tuple22.f1).intValue() + ((Integer) tuple23.f1).intValue()));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
