package org.apache.flink.table.runtime.operators.multipleinput;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper;
import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGeneratorTest.class */
public class TableOperatorWrapperGeneratorTest extends MultipleInputTestBase {

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapperGeneratorTest$TestingTransformation.class */
    private static class TestingTransformation<T> extends Transformation<T> {
        private final Transformation<T> input;

        public TestingTransformation(Transformation<T> transformation, String str, int i) {
            super(str, transformation.getOutputType(), i);
            this.input = transformation;
        }

        public List<Transformation<?>> getTransitivePredecessors() {
            return Collections.emptyList();
        }

        public List<Transformation<?>> getInputs() {
            return Collections.singletonList(this.input);
        }
    }

    @Test
    public void testSimple() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation<RowData> createSource = createSource(executionEnvironment, "source1");
        Transformation<RowData> createSource2 = createSource(executionEnvironment, "source2");
        OneInputTransformation<RowData, RowData> createOneInputTransform = createOneInputTransform(createSource, "agg1", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        createOneInputTransform.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1);
        OneInputTransformation<RowData, RowData> createOneInputTransform2 = createOneInputTransform(createSource2, "agg2", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        createOneInputTransform2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 2);
        TwoInputTransformation<RowData, RowData, RowData> createTwoInputTransform = createTwoInputTransform(createOneInputTransform, createOneInputTransform2, "join", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()})));
        createTwoInputTransform.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 3);
        TableOperatorWrapperGenerator tableOperatorWrapperGenerator = new TableOperatorWrapperGenerator(Arrays.asList(createSource, createSource2), createTwoInputTransform, new int[]{1, 0});
        tableOperatorWrapperGenerator.generate();
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper = createWrapper(createOneInputTransform, 1, 0.16666666666666666d);
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper2 = createWrapper(createOneInputTransform2, 2, 0.3333333333333333d);
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper3 = createWrapper(createTwoInputTransform, 0, 0.5d);
        createWrapper3.addInput(createWrapper, 1);
        createWrapper3.addInput(createWrapper2, 2);
        Assert.assertEquals(Arrays.asList(Pair.of(createSource, new InputSpec(1, 1, createWrapper, 1)), Pair.of(createSource2, new InputSpec(2, 0, createWrapper2, 1))), tableOperatorWrapperGenerator.getInputTransformAndInputSpecPairs());
        Assert.assertEquals(createWrapper3, tableOperatorWrapperGenerator.getTailWrapper());
        Assert.assertEquals(6L, tableOperatorWrapperGenerator.getManagedMemoryWeight());
        Assert.assertEquals(10L, tableOperatorWrapperGenerator.getParallelism());
        Assert.assertEquals(-1L, tableOperatorWrapperGenerator.getMaxParallelism());
        Assert.assertEquals(ResourceSpec.UNKNOWN, tableOperatorWrapperGenerator.getMinResources());
        Assert.assertEquals(ResourceSpec.UNKNOWN, tableOperatorWrapperGenerator.getPreferredResources());
    }

    @Test
    public void testComplex() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation<RowData> createSource = createSource(executionEnvironment, "source1");
        Transformation<RowData> createSource2 = createSource(executionEnvironment, "source2");
        Transformation<RowData> createSource3 = createSource(executionEnvironment, "source3");
        Transformation<RowData> createSource4 = createSource(executionEnvironment, "source4");
        Transformation<RowData> createSource5 = createSource(executionEnvironment, "source5");
        OneInputTransformation<RowData, RowData> createOneInputTransform = createOneInputTransform(createSource, "agg1", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        createOneInputTransform.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1);
        OneInputTransformation<RowData, RowData> createOneInputTransform2 = createOneInputTransform(createSource2, "agg2", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        createOneInputTransform2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 2);
        TwoInputTransformation<RowData, RowData, RowData> createTwoInputTransform = createTwoInputTransform(createOneInputTransform, createOneInputTransform2, "join1", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()})));
        createTwoInputTransform.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 3);
        TwoInputTransformation<RowData, RowData, RowData> createTwoInputTransform2 = createTwoInputTransform(createTwoInputTransform, createSource3, "join2", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()})));
        createTwoInputTransform2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 4);
        TwoInputTransformation<RowData, RowData, RowData> createTwoInputTransform3 = createTwoInputTransform(createSource4, createSource5, "join3", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()})));
        createTwoInputTransform3.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 5);
        TwoInputTransformation<RowData, RowData, RowData> createTwoInputTransform4 = createTwoInputTransform(createTwoInputTransform2, createTwoInputTransform3, "join4", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()})));
        createTwoInputTransform4.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 6);
        TableOperatorWrapperGenerator tableOperatorWrapperGenerator = new TableOperatorWrapperGenerator(Arrays.asList(createSource, createSource2, createSource3, createSource4, createSource5), createTwoInputTransform4, new int[]{2, 3, 4, 0, 1});
        tableOperatorWrapperGenerator.generate();
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper = createWrapper(createOneInputTransform, 3, 0.047619047619047616d);
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper2 = createWrapper(createOneInputTransform2, 4, 0.09523809523809523d);
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper3 = createWrapper(createTwoInputTransform, 2, 0.14285714285714285d);
        createWrapper3.addInput(createWrapper, 1);
        createWrapper3.addInput(createWrapper2, 2);
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper4 = createWrapper(createTwoInputTransform2, 1, 0.19047619047619047d);
        createWrapper4.addInput(createWrapper3, 1);
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper5 = createWrapper(createTwoInputTransform3, 5, 0.23809523809523808d);
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper6 = createWrapper(createTwoInputTransform4, 0, 0.2857142857142857d);
        createWrapper6.addInput(createWrapper4, 1);
        createWrapper6.addInput(createWrapper5, 2);
        Assert.assertEquals(Arrays.asList(Pair.of(createSource, new InputSpec(1, 2, createWrapper, 1)), Pair.of(createSource2, new InputSpec(2, 3, createWrapper2, 1)), Pair.of(createSource3, new InputSpec(3, 4, createWrapper4, 2)), Pair.of(createSource4, new InputSpec(4, 0, createWrapper5, 1)), Pair.of(createSource5, new InputSpec(5, 1, createWrapper5, 2))), tableOperatorWrapperGenerator.getInputTransformAndInputSpecPairs());
        Assert.assertEquals(createWrapper6, tableOperatorWrapperGenerator.getTailWrapper());
        Assert.assertEquals(21L, tableOperatorWrapperGenerator.getManagedMemoryWeight());
        Assert.assertEquals(10L, tableOperatorWrapperGenerator.getParallelism());
        Assert.assertEquals(-1L, tableOperatorWrapperGenerator.getMaxParallelism());
        Assert.assertEquals(ResourceSpec.UNKNOWN, tableOperatorWrapperGenerator.getMinResources());
        Assert.assertEquals(ResourceSpec.UNKNOWN, tableOperatorWrapperGenerator.getPreferredResources());
    }

    @Test
    public void testWithUnion() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation<RowData> createSource = createSource(executionEnvironment, "source1");
        Transformation<RowData> createSource2 = createSource(executionEnvironment, "source2");
        Transformation<RowData> createSource3 = createSource(executionEnvironment, "source3");
        Transformation<RowData> createSource4 = createSource(executionEnvironment, "source4");
        Transformation<RowData> createSource5 = createSource(executionEnvironment, "source5");
        UnionTransformation<RowData> createUnionInputTransform = createUnionInputTransform("union1", createSource, createSource2);
        UnionTransformation<RowData> createUnionInputTransform2 = createUnionInputTransform("union2", createUnionInputTransform, createSource3);
        OneInputTransformation<RowData, RowData> createOneInputTransform = createOneInputTransform(createSource4, "agg1", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        createOneInputTransform.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1);
        TwoInputTransformation<RowData, RowData, RowData> createTwoInputTransform = createTwoInputTransform(createOneInputTransform, createUnionInputTransform2, "join1", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        createTwoInputTransform.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 2);
        UnionTransformation<RowData> createUnionInputTransform3 = createUnionInputTransform("union3", createSource5, createTwoInputTransform);
        TableOperatorWrapperGenerator tableOperatorWrapperGenerator = new TableOperatorWrapperGenerator(Arrays.asList(createSource, createSource2, createSource3, createSource4, createSource5), createUnionInputTransform3, new int[]{1, 1, 1, 0, 2});
        tableOperatorWrapperGenerator.generate();
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper = createWrapper(createUnionInputTransform, 4);
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper2 = createWrapper(createUnionInputTransform2, 3);
        createWrapper2.addInput(createWrapper, 1);
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper3 = createWrapper(createOneInputTransform, 2, 0.3333333333333333d);
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper4 = createWrapper(createTwoInputTransform, 1, 0.6666666666666666d);
        createWrapper4.addInput(createWrapper3, 1);
        createWrapper4.addInput(createWrapper2, 2);
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper5 = createWrapper(createUnionInputTransform3, 0);
        createWrapper5.addInput(createWrapper4, 1);
        Assert.assertEquals(Arrays.asList(Pair.of(createSource5, new InputSpec(1, 2, createWrapper5, 1)), Pair.of(createSource4, new InputSpec(2, 0, createWrapper3, 1)), Pair.of(createSource, new InputSpec(3, 1, createWrapper, 1)), Pair.of(createSource2, new InputSpec(4, 1, createWrapper, 1)), Pair.of(createSource3, new InputSpec(5, 1, createWrapper2, 1))), tableOperatorWrapperGenerator.getInputTransformAndInputSpecPairs());
        Assert.assertEquals(createWrapper5, tableOperatorWrapperGenerator.getTailWrapper());
        Assert.assertEquals(3L, tableOperatorWrapperGenerator.getManagedMemoryWeight());
        Assert.assertEquals(10L, tableOperatorWrapperGenerator.getParallelism());
        Assert.assertEquals(-1L, tableOperatorWrapperGenerator.getMaxParallelism());
        Assert.assertEquals(ResourceSpec.UNKNOWN, tableOperatorWrapperGenerator.getMinResources());
        Assert.assertEquals(ResourceSpec.UNKNOWN, tableOperatorWrapperGenerator.getPreferredResources());
    }

    @Test
    public void testDifferentParallelisms() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation<RowData> createSource = createSource(executionEnvironment, "source1");
        Transformation<RowData> createSource2 = createSource(executionEnvironment, "source2");
        Transformation<RowData> createSource3 = createSource(executionEnvironment, "source3");
        OneInputTransformation<RowData, RowData> createOneInputTransform = createOneInputTransform(createSource, "calc1", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        createOneInputTransform.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1);
        createOneInputTransform.setParallelism(100);
        OneInputTransformation<RowData, RowData> createOneInputTransform2 = createOneInputTransform(createSource2, "calc2", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        createOneInputTransform2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1);
        createOneInputTransform2.setParallelism(50);
        UnionTransformation<RowData> createUnionInputTransform = createUnionInputTransform("union1", createOneInputTransform, createOneInputTransform2);
        TwoInputTransformation<RowData, RowData, RowData> createTwoInputTransform = createTwoInputTransform(createUnionInputTransform, createSource3, "join1", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        createTwoInputTransform.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1);
        createTwoInputTransform.setParallelism(200);
        TableOperatorWrapperGenerator tableOperatorWrapperGenerator = new TableOperatorWrapperGenerator(Arrays.asList(createSource, createSource2, createSource3), createTwoInputTransform, new int[]{1, 1, 0});
        tableOperatorWrapperGenerator.generate();
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper = createWrapper(createOneInputTransform, 2, 0.3333333333333333d);
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper2 = createWrapper(createOneInputTransform2, 3, 0.3333333333333333d);
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper3 = createWrapper(createUnionInputTransform, 1);
        createWrapper3.addInput(createWrapper, 1);
        createWrapper3.addInput(createWrapper2, 2);
        TableOperatorWrapper<StreamOperator<RowData>> createWrapper4 = createWrapper(createTwoInputTransform, 0, 0.3333333333333333d);
        createWrapper4.addInput(createWrapper3, 2);
        Assert.assertEquals(Arrays.asList(Pair.of(createSource, new InputSpec(1, 1, createWrapper, 1)), Pair.of(createSource2, new InputSpec(2, 1, createWrapper2, 1)), Pair.of(createSource3, new InputSpec(3, 0, createWrapper4, 2))), tableOperatorWrapperGenerator.getInputTransformAndInputSpecPairs());
        Assert.assertEquals(Arrays.asList(new TableOperatorWrapper.Edge(createWrapper, createWrapper3, 1), new TableOperatorWrapper.Edge(createWrapper2, createWrapper3, 2)), createWrapper3.getInputEdges());
        Assert.assertEquals(Collections.singletonList(new TableOperatorWrapper.Edge(createWrapper3, createWrapper4, 2)), createWrapper4.getInputEdges());
        Assert.assertEquals(createWrapper4, tableOperatorWrapperGenerator.getTailWrapper());
        Assert.assertEquals(3L, tableOperatorWrapperGenerator.getManagedMemoryWeight());
        Assert.assertEquals(200L, tableOperatorWrapperGenerator.getParallelism());
        Assert.assertEquals(-1L, tableOperatorWrapperGenerator.getMaxParallelism());
        Assert.assertEquals(ResourceSpec.UNKNOWN, tableOperatorWrapperGenerator.getMinResources());
        Assert.assertEquals(ResourceSpec.UNKNOWN, tableOperatorWrapperGenerator.getPreferredResources());
    }

    @Test(expected = RuntimeException.class)
    public void testUnsupportedTransformation() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation<RowData> createSource = createSource(executionEnvironment, "source1");
        Transformation<RowData> createSource2 = createSource(executionEnvironment, "source2");
        new TableOperatorWrapperGenerator(Arrays.asList(createSource, createSource2), createTwoInputTransform(new TestingTransformation(createSource, "test", 10), createSource2, "join1", InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()}))), new int[]{0, 0}).generate();
    }

    @SafeVarargs
    private final UnionTransformation<RowData> createUnionInputTransform(String str, Transformation<RowData>... transformationArr) {
        UnionTransformation<RowData> unionTransformation = new UnionTransformation<>(Arrays.asList(transformationArr));
        unionTransformation.setName(str);
        return unionTransformation;
    }

    private TableOperatorWrapper<StreamOperator<RowData>> createWrapper(OneInputTransformation<RowData, RowData> oneInputTransformation, int i, double d) {
        TableOperatorWrapper<StreamOperator<RowData>> tableOperatorWrapper = new TableOperatorWrapper<>(oneInputTransformation.getOperatorFactory(), "SubOp" + i + "_" + oneInputTransformation.getName(), Collections.singletonList(oneInputTransformation.getInputType()), oneInputTransformation.getOutputType());
        tableOperatorWrapper.setManagedMemoryFraction(d);
        return tableOperatorWrapper;
    }

    private TableOperatorWrapper<StreamOperator<RowData>> createWrapper(TwoInputTransformation<RowData, RowData, RowData> twoInputTransformation, int i, double d) {
        TableOperatorWrapper<StreamOperator<RowData>> tableOperatorWrapper = new TableOperatorWrapper<>(twoInputTransformation.getOperatorFactory(), "SubOp" + i + "_" + twoInputTransformation.getName(), Arrays.asList(twoInputTransformation.getInputType1(), twoInputTransformation.getInputType2()), twoInputTransformation.getOutputType());
        tableOperatorWrapper.setManagedMemoryFraction(d);
        return tableOperatorWrapper;
    }

    private TableOperatorWrapper<StreamOperator<RowData>> createWrapper(UnionTransformation<RowData> unionTransformation, int i) {
        TableOperatorWrapper<StreamOperator<RowData>> tableOperatorWrapper = new TableOperatorWrapper<>(SimpleOperatorFactory.of(new UnionStreamOperator()), "SubOp" + i + "_" + unionTransformation.getName(), (List) unionTransformation.getInputs().stream().map((v0) -> {
            return v0.getOutputType();
        }).collect(Collectors.toList()), unionTransformation.getOutputType());
        tableOperatorWrapper.setManagedMemoryFraction(0.0d);
        return tableOperatorWrapper;
    }
}
