package org.apache.flink.table.planner.plan.nodes.exec.processor.utils;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.TestingBatchExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecDynamicFilteringDataCollector;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.class */
class InputPriorityConflictResolverTest {

    @Parameter
    public Tuple2<BatchShuffleMode, StreamExchangeMode> batchShuffleModeAndStreamExchangeMode;

    InputPriorityConflictResolverTest() {
    }

    @Parameters(name = "batchShuffleModeAndStreamExchangeMode={0}")
    public static Collection<Tuple2<BatchShuffleMode, StreamExchangeMode>> parameters() {
        return Arrays.asList(Tuple2.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING, StreamExchangeMode.BATCH), Tuple2.of(BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, StreamExchangeMode.BATCH), Tuple2.of(BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE, StreamExchangeMode.BATCH), Tuple2.of(BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, StreamExchangeMode.HYBRID_FULL), Tuple2.of(BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE, StreamExchangeMode.HYBRID_SELECTIVE));
    }

    @TestTemplate
    void testDetectAndResolve() {
        TestingBatchExecNode[] testingBatchExecNodeArr = new TestingBatchExecNode[9];
        for (int i = 0; i < testingBatchExecNodeArr.length; i++) {
            testingBatchExecNodeArr[i] = new TestingBatchExecNode("TestingBatchExecNode" + i);
        }
        testingBatchExecNodeArr[1].addInput(testingBatchExecNodeArr[0], InputProperty.builder().priority(0).build());
        testingBatchExecNodeArr[2].addInput(testingBatchExecNodeArr[1], InputProperty.builder().priority(0).build());
        testingBatchExecNodeArr[3].addInput(testingBatchExecNodeArr[0], InputProperty.builder().priority(0).build());
        testingBatchExecNodeArr[4].addInput(testingBatchExecNodeArr[8], InputProperty.builder().priority(0).build());
        testingBatchExecNodeArr[4].addInput(testingBatchExecNodeArr[0], InputProperty.builder().priority(0).build());
        testingBatchExecNodeArr[5].addInput(testingBatchExecNodeArr[8], InputProperty.builder().priority(0).build());
        testingBatchExecNodeArr[5].addInput(testingBatchExecNodeArr[0], InputProperty.builder().damBehavior(InputProperty.DamBehavior.END_INPUT).priority(0).build());
        testingBatchExecNodeArr[7].addInput(testingBatchExecNodeArr[1], InputProperty.builder().priority(0).build());
        testingBatchExecNodeArr[7].addInput(testingBatchExecNodeArr[2], InputProperty.builder().priority(0).build());
        testingBatchExecNodeArr[7].addInput(testingBatchExecNodeArr[3], InputProperty.builder().priority(1).build());
        testingBatchExecNodeArr[7].addInput(testingBatchExecNodeArr[4], InputProperty.builder().priority(10).build());
        testingBatchExecNodeArr[7].addInput(testingBatchExecNodeArr[5], InputProperty.builder().priority(10).build());
        testingBatchExecNodeArr[7].addInput(testingBatchExecNodeArr[6], InputProperty.builder().priority(100).build());
        Configuration configuration = new Configuration();
        BatchShuffleMode batchShuffleMode = (BatchShuffleMode) this.batchShuffleModeAndStreamExchangeMode.f0;
        StreamExchangeMode streamExchangeMode = (StreamExchangeMode) this.batchShuffleModeAndStreamExchangeMode.f1;
        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, batchShuffleMode);
        new InputPriorityConflictResolver(Collections.singletonList(testingBatchExecNodeArr[7]), InputProperty.DamBehavior.END_INPUT, streamExchangeMode, configuration).detectAndResolve();
        Assertions.assertThat(testingBatchExecNodeArr[7].getInputNodes().get(0)).isEqualTo(testingBatchExecNodeArr[1]);
        Assertions.assertThat(testingBatchExecNodeArr[7].getInputNodes().get(1)).isEqualTo(testingBatchExecNodeArr[2]);
        Assertions.assertThat(testingBatchExecNodeArr[7].getInputNodes().get(2)).isInstanceOf(BatchExecExchange.class);
        Assertions.assertThat(testingBatchExecNodeArr[7].getInputNodes().get(2).getRequiredExchangeMode()).isEqualTo(Optional.of(streamExchangeMode));
        Assertions.assertThat(((ExecEdge) testingBatchExecNodeArr[7].getInputNodes().get(2).getInputEdges().get(0)).getSource()).isEqualTo(testingBatchExecNodeArr[3]);
        Assertions.assertThat(testingBatchExecNodeArr[7].getInputNodes().get(3)).isInstanceOf(BatchExecExchange.class);
        Assertions.assertThat(testingBatchExecNodeArr[7].getInputNodes().get(3).getRequiredExchangeMode()).isEqualTo(Optional.of(streamExchangeMode));
        Assertions.assertThat(((ExecEdge) testingBatchExecNodeArr[7].getInputNodes().get(3).getInputEdges().get(0)).getSource()).isEqualTo(testingBatchExecNodeArr[4]);
        Assertions.assertThat(testingBatchExecNodeArr[7].getInputNodes().get(4)).isEqualTo(testingBatchExecNodeArr[5]);
        Assertions.assertThat(testingBatchExecNodeArr[7].getInputNodes().get(5)).isEqualTo(testingBatchExecNodeArr[6]);
    }

    @TestTemplate
    void testDeadlockCausedByExchange() {
        TestingBatchExecNode[] testingBatchExecNodeArr = new TestingBatchExecNode[2];
        for (int i = 0; i < testingBatchExecNodeArr.length; i++) {
            testingBatchExecNodeArr[i] = new TestingBatchExecNode("TestingBatchExecNode" + i);
        }
        Configuration configuration = new Configuration();
        BatchShuffleMode batchShuffleMode = (BatchShuffleMode) this.batchShuffleModeAndStreamExchangeMode.f0;
        StreamExchangeMode streamExchangeMode = (StreamExchangeMode) this.batchShuffleModeAndStreamExchangeMode.f1;
        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, batchShuffleMode);
        ExecNode<?> batchExecExchange = new BatchExecExchange<>(configuration, InputProperty.builder().requiredDistribution(InputProperty.ANY_DISTRIBUTION).build(), (RowType) testingBatchExecNodeArr[0].getOutputType(), "Exchange");
        batchExecExchange.setRequiredExchangeMode(streamExchangeMode);
        batchExecExchange.setInputEdges(Collections.singletonList(ExecEdge.builder().source(testingBatchExecNodeArr[0]).target(batchExecExchange).build()));
        testingBatchExecNodeArr[1].addInput(batchExecExchange, InputProperty.builder().priority(0).build());
        testingBatchExecNodeArr[1].addInput(batchExecExchange, InputProperty.builder().priority(1).build());
        new InputPriorityConflictResolver(Collections.singletonList(testingBatchExecNodeArr[1]), InputProperty.DamBehavior.END_INPUT, streamExchangeMode, configuration).detectAndResolve();
        ExecNode<?> execNode = testingBatchExecNodeArr[1].getInputNodes().get(0);
        ExecNode<?> execNode2 = testingBatchExecNodeArr[1].getInputNodes().get(1);
        Assertions.assertThat(execNode2).isNotSameAs(execNode);
        Consumer consumer = execNode3 -> {
            Assertions.assertThat(execNode3).isInstanceOf(BatchExecExchange.class);
            BatchExecExchange batchExecExchange2 = (BatchExecExchange) execNode3;
            Assertions.assertThat(batchExecExchange2.getRequiredExchangeMode()).isEqualTo(Optional.of(streamExchangeMode));
            Assertions.assertThat(((ExecEdge) batchExecExchange2.getInputEdges().get(0)).getSource()).isEqualTo(testingBatchExecNodeArr[0]);
        };
        consumer.accept(execNode);
        consumer.accept(execNode2);
    }

    @TestTemplate
    void testWithDynamicFilteringPlan() {
        TestingBatchExecNode[] testingBatchExecNodeArr = new TestingBatchExecNode[3];
        for (int i = 0; i < testingBatchExecNodeArr.length; i++) {
            testingBatchExecNodeArr[i] = new TestingBatchExecNode("TestingBatchExecNode" + i);
        }
        ExecNode<?> batchExecTableSourceScan = new BatchExecTableSourceScan<>(new Configuration(), new DynamicTableSourceSpec((ContextResolvedTable) null, (List) null), InputProperty.DEFAULT, RowType.of(new LogicalType[]{new IntType(), new IntType(), new IntType()}), "DynamicFilteringTableSourceScan");
        BatchExecDynamicFilteringDataCollector batchExecDynamicFilteringDataCollector = new BatchExecDynamicFilteringDataCollector(Collections.singletonList(1), new Configuration(), InputProperty.DEFAULT, RowType.of(new LogicalType[]{new IntType()}), "DynamicFilteringDataCollector");
        testingBatchExecNodeArr[0].addInput(testingBatchExecNodeArr[1], InputProperty.builder().priority(0).build());
        testingBatchExecNodeArr[1].addInput(testingBatchExecNodeArr[2], InputProperty.builder().priority(1).build());
        testingBatchExecNodeArr[1].addInput(batchExecTableSourceScan, InputProperty.builder().priority(0).build());
        batchExecTableSourceScan.setInputEdges(Collections.singletonList(ExecEdge.builder().source(batchExecDynamicFilteringDataCollector).target(batchExecTableSourceScan).build()));
        batchExecDynamicFilteringDataCollector.setInputEdges(Collections.singletonList(ExecEdge.builder().source(testingBatchExecNodeArr[2]).target(batchExecDynamicFilteringDataCollector).build()));
        Configuration configuration = new Configuration();
        BatchShuffleMode batchShuffleMode = (BatchShuffleMode) this.batchShuffleModeAndStreamExchangeMode.f0;
        StreamExchangeMode streamExchangeMode = (StreamExchangeMode) this.batchShuffleModeAndStreamExchangeMode.f1;
        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, batchShuffleMode);
        new InputPriorityConflictResolver(Collections.singletonList(testingBatchExecNodeArr[1]), InputProperty.DamBehavior.END_INPUT, streamExchangeMode, configuration).detectAndResolve();
        ExecNode<?> execNode = testingBatchExecNodeArr[1].getInputNodes().get(0);
        ExecNode<?> execNode2 = testingBatchExecNodeArr[1].getInputNodes().get(1);
        Assertions.assertThat(execNode).isSameAs(testingBatchExecNodeArr[2]);
        Assertions.assertThat(execNode2).isSameAs(batchExecTableSourceScan);
    }
}
