package org.apache.paimon.flink.sorter;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.shuffle.RangeShuffle;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyProjectedRow;
import org.apache.paimon.utils.SerializableSupplier;

/* loaded from: input_file:org/apache/paimon/flink/sorter/SortUtils.class */
public class SortUtils {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/paimon/flink/sorter/SortUtils$KeyAbstract.class */
    public interface KeyAbstract<KEY> extends Serializable {
        default void open() {
        }

        KEY apply(RowData rowData);
    }

    /* loaded from: input_file:org/apache/paimon/flink/sorter/SortUtils$ShuffleKeyConvertor.class */
    interface ShuffleKeyConvertor<KEY> extends Function<KEY, InternalRow>, Serializable {
    }

    public static <KEY> DataStream<RowData> sortStreamByKey(DataStream<RowData> dataStream, FileStoreTable fileStoreTable, RowType rowType, TypeInformation<KEY> typeInformation, SerializableSupplier<Comparator<KEY>> serializableSupplier, final KeyAbstract<KEY> keyAbstract, ShuffleKeyConvertor<KEY> shuffleKeyConvertor) {
        RowType rowType2 = fileStoreTable.rowType();
        int parallelism = dataStream.getParallelism();
        CoreOptions coreOptions = fileStoreTable.coreOptions();
        String str = fileStoreTable.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        int parallelism2 = str == null ? dataStream.getParallelism() : Integer.parseInt(str);
        if (parallelism2 == -1) {
            throw new UnsupportedOperationException("The adaptive batch scheduler is not supported. Please set the sink parallelism using the key: " + FlinkConnectorOptions.SINK_PARALLELISM.key());
        }
        int intValue = coreOptions.getLocalSampleMagnification().intValue();
        if (intValue < 20) {
            throw new IllegalArgumentException(String.format("the config '%s=%d' should not be set too small,greater than or equal to 20 is needed.", CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(), Integer.valueOf(intValue)));
        }
        int i = parallelism2 * intValue;
        int i2 = parallelism2 * 1000;
        int i3 = parallelism2 * 10;
        int fieldCount = rowType.getFieldCount();
        int fieldCount2 = rowType2.getFieldCount();
        final int[] iArr = new int[fieldCount2];
        for (int i4 = 0; i4 < fieldCount2; i4++) {
            iArr[i4] = i4 + fieldCount;
        }
        List<DataField> fields = rowType.getFields();
        List<DataField> fields2 = rowType2.getFields();
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(fields);
        arrayList.addAll(fields2);
        RowType rowType3 = new RowType(arrayList);
        InternalTypeInfo<InternalRow> fromRowType = InternalTypeInfo.fromRowType(rowType3);
        return RangeShuffle.rangeShuffleByKey(dataStream.map(new RichMapFunction<RowData, Tuple2<KEY, RowData>>() { // from class: org.apache.paimon.flink.sorter.SortUtils.1
            public void open(Configuration configuration) throws Exception {
                super.open(configuration);
                KeyAbstract.this.open();
            }

            public Tuple2<KEY, RowData> map(RowData rowData) {
                return Tuple2.of(KeyAbstract.this.apply(rowData), rowData);
            }
        }, new TupleTypeInfo(new TypeInformation[]{typeInformation, dataStream.getType()})).setParallelism(parallelism), serializableSupplier, typeInformation, i, i2, i3, parallelism2, rowType2, coreOptions.sortBySize()).map(tuple2 -> {
            return new JoinedRow(shuffleKeyConvertor.apply(tuple2.f0), new FlinkRowWrapper((RowData) tuple2.f1));
        }, fromRowType).setParallelism(parallelism2).transform("LOCAL SORT", fromRowType, new SortOperator(rowType, rowType3, coreOptions.writeBufferSize(), coreOptions.pageSize(), coreOptions.localSortMaxNumFileHandles(), coreOptions.spillCompression(), parallelism2, coreOptions.writeBufferSpillDiskSize())).setParallelism(parallelism2).map(new RichMapFunction<InternalRow, InternalRow>() { // from class: org.apache.paimon.flink.sorter.SortUtils.2
            private transient KeyProjectedRow keyProjectedRow;

            public void open(Configuration configuration) {
                this.keyProjectedRow = new KeyProjectedRow(iArr);
            }

            public InternalRow map(InternalRow internalRow) {
                return this.keyProjectedRow.replaceRow(internalRow);
            }
        }, InternalTypeInfo.fromRowType(rowType2)).setParallelism(parallelism2).map(FlinkRowData::new, dataStream.getType()).setParallelism(parallelism2);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 589154869:
                if (implMethodName.equals("lambda$sortStreamByKey$8b29be41$1")) {
                    z = false;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sorter/SortUtils") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/flink/sorter/SortUtils$ShuffleKeyConvertor;Lorg/apache/flink/api/java/tuple/Tuple2;)Lorg/apache/paimon/data/InternalRow;")) {
                    ShuffleKeyConvertor shuffleKeyConvertor = (ShuffleKeyConvertor) serializedLambda.getCapturedArg(0);
                    return tuple2 -> {
                        return new JoinedRow(shuffleKeyConvertor.apply(tuple2.f0), new FlinkRowWrapper((RowData) tuple2.f1));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/FlinkRowData") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/data/InternalRow;)V")) {
                    return FlinkRowData::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
