package org.apache.flink.ml.common.datastream;

import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
/* loaded from: input_file:org/apache/flink/ml/common/datastream/AllReduceImpl.class */
public class AllReduceImpl {

    @VisibleForTesting
    static final int CHUNK_SIZE = 4096;

    /* loaded from: input_file:org/apache/flink/ml/common/datastream/AllReduceImpl$AllReduceRecv.class */
    private static class AllReduceRecv extends AbstractStreamOperator<double[]> implements OneInputStreamOperator<Tuple4<Integer, Integer, Integer, double[]>, double[]>, BoundedOneInput {
        double[] resultArray;

        private AllReduceRecv() {
        }

        public void endInput() {
            if (null != this.resultArray) {
                this.output.collect(new StreamRecord(this.resultArray));
            }
        }

        public void processElement(StreamRecord<Tuple4<Integer, Integer, Integer, double[]>> streamRecord) {
            Tuple4 tuple4 = (Tuple4) streamRecord.getValue();
            int intValue = ((Integer) tuple4.f1).intValue();
            int intValue2 = ((Integer) tuple4.f2).intValue();
            double[] dArr = (double[]) tuple4.f3;
            if (null == this.resultArray) {
                this.resultArray = new double[intValue2];
            }
            System.arraycopy(dArr, 0, this.resultArray, intValue * AllReduceImpl.CHUNK_SIZE, AllReduceImpl.getLengthOfChunk(intValue, this.resultArray.length));
        }
    }

    /* loaded from: input_file:org/apache/flink/ml/common/datastream/AllReduceImpl$AllReduceSend.class */
    private static class AllReduceSend extends RichFlatMapFunction<double[], Tuple3<Integer, Integer, double[]>> {
        private boolean hasReceivedOneRecord;
        private double[] transferBuffer;

        private AllReduceSend() {
            this.hasReceivedOneRecord = false;
            this.transferBuffer = new double[AllReduceImpl.CHUNK_SIZE];
        }

        public void flatMap(double[] dArr, Collector<Tuple3<Integer, Integer, double[]>> collector) {
            if (this.hasReceivedOneRecord) {
                throw new RuntimeException("The input cannot contain more than one double array.");
            }
            this.hasReceivedOneRecord = true;
            int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
            for (int i = 0; i < numberOfParallelSubtasks; i++) {
                int startChunkId = AllReduceImpl.getStartChunkId(i, numberOfParallelSubtasks, dArr.length);
                int numChunksByTaskId = AllReduceImpl.getNumChunksByTaskId(i, numberOfParallelSubtasks, dArr.length);
                for (int i2 = startChunkId; i2 < numChunksByTaskId + startChunkId; i2++) {
                    System.arraycopy(dArr, i2 * AllReduceImpl.CHUNK_SIZE, this.transferBuffer, 0, AllReduceImpl.getLengthOfChunk(i2, dArr.length));
                    collector.collect(Tuple3.of(Integer.valueOf(i2), Integer.valueOf(dArr.length), this.transferBuffer));
                }
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((double[]) obj, (Collector<Tuple3<Integer, Integer, double[]>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/ml/common/datastream/AllReduceImpl$AllReduceSum.class */
    private static class AllReduceSum extends AbstractStreamOperator<Tuple4<Integer, Integer, Integer, double[]>> implements OneInputStreamOperator<Tuple3<Integer, Integer, double[]>, Tuple4<Integer, Integer, Integer, double[]>>, BoundedOneInput {
        private Map<Integer, Tuple2<Integer, double[]>> aggregatedArrayChunkByChunkId;

        private AllReduceSum() {
            this.aggregatedArrayChunkByChunkId = new HashMap();
        }

        public void endInput() {
            int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
            for (Map.Entry<Integer, Tuple2<Integer, double[]>> entry : this.aggregatedArrayChunkByChunkId.entrySet()) {
                for (int i = 0; i < numberOfParallelSubtasks; i++) {
                    this.output.collect(new StreamRecord(Tuple4.of(Integer.valueOf(i), Integer.valueOf(entry.getKey().intValue()), Integer.valueOf(((Integer) entry.getValue().f0).intValue()), (double[]) entry.getValue().f1)));
                }
            }
        }

        public void processElement(StreamRecord<Tuple3<Integer, Integer, double[]>> streamRecord) {
            Tuple3 tuple3 = (Tuple3) streamRecord.getValue();
            int intValue = ((Integer) tuple3.f0).intValue();
            int intValue2 = ((Integer) tuple3.f1).intValue();
            double[] dArr = (double[]) tuple3.f2;
            if (!this.aggregatedArrayChunkByChunkId.containsKey(Integer.valueOf(intValue))) {
                this.aggregatedArrayChunkByChunkId.put(Integer.valueOf(intValue), Tuple2.of(Integer.valueOf(intValue2), dArr));
                return;
            }
            if (((Integer) this.aggregatedArrayChunkByChunkId.get(Integer.valueOf(intValue)).f0).intValue() != intValue2) {
                throw new RuntimeException("The input double array must have same length.");
            }
            double[] dArr2 = (double[]) this.aggregatedArrayChunkByChunkId.get(Integer.valueOf(intValue)).f1;
            for (int i = 0; i < dArr2.length; i++) {
                int i2 = i;
                dArr2[i2] = dArr2[i2] + dArr[i];
            }
        }
    }

    AllReduceImpl() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static DataStream<double[]> allReduceSum(DataStream<double[]> dataStream) {
        return dataStream.flatMap(new AllReduceSend()).setParallelism(dataStream.getParallelism()).name("all-reduce-send").partitionCustom((num, i) -> {
            return num.intValue() % i;
        }, tuple3 -> {
            return (Integer) tuple3.f0;
        }).transform("all-reduce-sum", new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO}), new AllReduceSum()).setParallelism(dataStream.getParallelism()).name("all-reduce-sum").partitionCustom((num2, i2) -> {
            return num2.intValue() % i2;
        }, tuple4 -> {
            return (Integer) tuple4.f0;
        }).transform("all-reduce-recv", PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, new AllReduceRecv()).setParallelism(dataStream.getParallelism()).name("all-reduce-recv");
    }

    private static int getNumChunks(int i) {
        int i2 = i / CHUNK_SIZE;
        return i % CHUNK_SIZE == 0 ? i2 : i2 + 1;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int getLengthOfChunk(int i, int i2) {
        int i3;
        return (i != getNumChunks(i2) - 1 || (i3 = i2 % CHUNK_SIZE) == 0) ? CHUNK_SIZE : i3;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int getStartChunkId(int i, int i2, int i3) {
        int numChunks = getNumChunks(i3);
        int i4 = numChunks / i2;
        int i5 = numChunks % i2;
        return i >= i5 ? (i4 * i) + i5 : (i4 * i) + i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int getNumChunksByTaskId(int i, int i2, int i3) {
        int numChunks = getNumChunks(i3);
        int i4 = numChunks / i2;
        return i >= numChunks % i2 ? i4 : i4 + 1;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2097418587:
                if (implMethodName.equals("lambda$allReduceSum$e858a3c0$1")) {
                    z = true;
                    break;
                }
                break;
            case 424451363:
                if (implMethodName.equals("lambda$allReduceSum$a1cd4699$1")) {
                    z = 2;
                    break;
                }
                break;
            case 641432703:
                if (implMethodName.equals("lambda$allReduceSum$9a094882$1")) {
                    z = false;
                    break;
                }
                break;
            case 1670953799:
                if (implMethodName.equals("lambda$allReduceSum$6d59ed17$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/ml/common/datastream/AllReduceImpl") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple4;)Ljava/lang/Integer;")) {
                    return tuple4 -> {
                        return (Integer) tuple4.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/Partitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("partition") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;I)I") && serializedLambda.getImplClass().equals("org/apache/flink/ml/common/datastream/AllReduceImpl") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;I)I")) {
                    return (num2, i2) -> {
                        return num2.intValue() % i2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/Partitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("partition") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;I)I") && serializedLambda.getImplClass().equals("org/apache/flink/ml/common/datastream/AllReduceImpl") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;I)I")) {
                    return (num, i) -> {
                        return num.intValue() % i;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/ml/common/datastream/AllReduceImpl") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple3;)Ljava/lang/Integer;")) {
                    return tuple3 -> {
                        return (Integer) tuple3.f0;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
