package org.apache.flink.table.runtime.operators.multipleinput;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
import org.apache.flink.table.runtime.operators.multipleinput.input.OneInput;
import org.apache.flink.table.runtime.operators.multipleinput.input.SecondInputOfTwoInput;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest.class */
public class BatchMultipleInputStreamOperatorTest extends MultipleInputTestBase {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest$TestingBatchMultipleInputStreamOperator.class */
    public static class TestingBatchMultipleInputStreamOperator extends BatchMultipleInputStreamOperator {
        private final TableOperatorWrapper<?> tailWrapper;
        private final List<StreamElement> outputData;

        public TestingBatchMultipleInputStreamOperator(StreamOperatorParameters<RowData> streamOperatorParameters, List<InputSpec> list, List<TableOperatorWrapper<?>> list2, TableOperatorWrapper<?> tableOperatorWrapper, List<StreamElement> list3) {
            super(streamOperatorParameters, list, list2, tableOperatorWrapper);
            this.tailWrapper = tableOperatorWrapper;
            this.outputData = list3;
        }

        public List<StreamElement> getOutputData() {
            return this.outputData;
        }

        public TableOperatorWrapper<?> getTailWrapper() {
            return this.tailWrapper;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest$TestingOutput.class */
    public static class TestingOutput extends CollectorOutput<RowData> {
        private final List<StreamElement> list;

        public TestingOutput(List<StreamElement> list) {
            super(list);
            this.list = list;
        }

        public void collect(StreamRecord<RowData> streamRecord) {
            this.list.add(streamRecord);
        }
    }

    @Test
    public void testOpen() throws Exception {
        TestingBatchMultipleInputStreamOperator createMultipleInputStreamOperator = createMultipleInputStreamOperator();
        TestingTwoInputStreamOperator streamOperator = createMultipleInputStreamOperator.getTailWrapper().getStreamOperator();
        TableOperatorWrapper tableOperatorWrapper = (TableOperatorWrapper) createMultipleInputStreamOperator.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator streamOperator2 = tableOperatorWrapper.getStreamOperator();
        TestingOneInputStreamOperator streamOperator3 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(0)).getStreamOperator();
        TestingOneInputStreamOperator streamOperator4 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(1)).getStreamOperator();
        Assertions.assertThat(streamOperator3.isOpened()).isFalse();
        Assertions.assertThat(streamOperator4.isOpened()).isFalse();
        Assertions.assertThat(streamOperator3.isOpened()).isFalse();
        Assertions.assertThat(streamOperator.isOpened()).isFalse();
        createMultipleInputStreamOperator.open();
        Assertions.assertThat(streamOperator3.isOpened()).isTrue();
        Assertions.assertThat(streamOperator4.isOpened()).isTrue();
        Assertions.assertThat(streamOperator2.isOpened()).isTrue();
        Assertions.assertThat(streamOperator.isOpened()).isTrue();
    }

    @Test
    public void testNextSelectionAndEndInput() throws Exception {
        TestingBatchMultipleInputStreamOperator createMultipleInputStreamOperator = createMultipleInputStreamOperator();
        TestingTwoInputStreamOperator streamOperator = createMultipleInputStreamOperator.getTailWrapper().getStreamOperator();
        TableOperatorWrapper tableOperatorWrapper = (TableOperatorWrapper) createMultipleInputStreamOperator.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator streamOperator2 = tableOperatorWrapper.getStreamOperator();
        TestingOneInputStreamOperator streamOperator3 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(0)).getStreamOperator();
        TestingOneInputStreamOperator streamOperator4 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(1)).getStreamOperator();
        Assertions.assertThat(streamOperator3.isEnd()).isFalse();
        Assertions.assertThat(streamOperator4.isEnd()).isFalse();
        Assertions.assertThat(streamOperator2.getEndInputs()).isEmpty();
        Assertions.assertThat(streamOperator.getEndInputs()).isEmpty();
        Assertions.assertThat(createMultipleInputStreamOperator.nextSelection()).isEqualTo(new InputSelection.Builder().select(3).build(3));
        createMultipleInputStreamOperator.endInput(3);
        Assertions.assertThat(streamOperator3.isEnd()).isFalse();
        Assertions.assertThat(streamOperator4.isEnd()).isFalse();
        Assertions.assertThat(streamOperator2.getEndInputs()).isEmpty();
        Assertions.assertThat(streamOperator.getEndInputs()).containsExactly(new Integer[]{2});
        Assertions.assertThat(createMultipleInputStreamOperator.nextSelection()).isEqualTo(new InputSelection.Builder().select(1).build(3));
        createMultipleInputStreamOperator.endInput(1);
        Assertions.assertThat(streamOperator3.isEnd()).isTrue();
        Assertions.assertThat(streamOperator4.isEnd()).isFalse();
        Assertions.assertThat(streamOperator2.getEndInputs()).containsExactly(new Integer[]{1});
        Assertions.assertThat(streamOperator.getEndInputs()).containsExactly(new Integer[]{2});
        Assertions.assertThat(createMultipleInputStreamOperator.nextSelection()).isEqualTo(new InputSelection.Builder().select(2).build(3));
        createMultipleInputStreamOperator.endInput(2);
        Assertions.assertThat(streamOperator3.isEnd()).isTrue();
        Assertions.assertThat(streamOperator4.isEnd()).isTrue();
        Assertions.assertThat(streamOperator2.getEndInputs()).isEqualTo(Arrays.asList(1, 2));
        Assertions.assertThat(streamOperator.getEndInputs()).isEqualTo(Arrays.asList(2, 1));
        Assertions.assertThat(createMultipleInputStreamOperator.nextSelection()).isEqualTo(InputSelection.ALL);
    }

    @Test
    public void testClose() throws Exception {
        TestingBatchMultipleInputStreamOperator createMultipleInputStreamOperator = createMultipleInputStreamOperator();
        TestingTwoInputStreamOperator streamOperator = createMultipleInputStreamOperator.getTailWrapper().getStreamOperator();
        TableOperatorWrapper tableOperatorWrapper = (TableOperatorWrapper) createMultipleInputStreamOperator.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator streamOperator2 = tableOperatorWrapper.getStreamOperator();
        TestingOneInputStreamOperator streamOperator3 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(0)).getStreamOperator();
        TestingOneInputStreamOperator streamOperator4 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(1)).getStreamOperator();
        Assertions.assertThat(streamOperator3.isClosed()).isFalse();
        Assertions.assertThat(streamOperator4.isClosed()).isFalse();
        Assertions.assertThat(streamOperator3.isClosed()).isFalse();
        Assertions.assertThat(streamOperator.isClosed()).isFalse();
        createMultipleInputStreamOperator.close();
        Assertions.assertThat(streamOperator3.isClosed()).isTrue();
        Assertions.assertThat(streamOperator4.isClosed()).isTrue();
        Assertions.assertThat(streamOperator2.isClosed()).isTrue();
        Assertions.assertThat(streamOperator.isClosed()).isTrue();
    }

    @Test
    public void testProcess() throws Exception {
        TestingBatchMultipleInputStreamOperator createMultipleInputStreamOperator = createMultipleInputStreamOperator();
        List<StreamElement> outputData = createMultipleInputStreamOperator.getOutputData();
        TestingTwoInputStreamOperator streamOperator = createMultipleInputStreamOperator.getTailWrapper().getStreamOperator();
        TableOperatorWrapper tableOperatorWrapper = (TableOperatorWrapper) createMultipleInputStreamOperator.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator streamOperator2 = tableOperatorWrapper.getStreamOperator();
        TestingOneInputStreamOperator streamOperator3 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(0)).getStreamOperator();
        TestingOneInputStreamOperator streamOperator4 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(1)).getStreamOperator();
        List inputs = createMultipleInputStreamOperator.getInputs();
        Assertions.assertThat(inputs).hasSize(3);
        Input input = (Input) inputs.get(0);
        Input input2 = (Input) inputs.get(1);
        Input input3 = (Input) inputs.get(2);
        Assertions.assertThat(input).isInstanceOf(OneInput.class);
        Assertions.assertThat(input2).isInstanceOf(OneInput.class);
        Assertions.assertThat(input3).isInstanceOf(SecondInputOfTwoInput.class);
        Assertions.assertThat(streamOperator.getCurrentElement1()).isNull();
        Assertions.assertThat(streamOperator.getCurrentElement2()).isNull();
        Assertions.assertThat(streamOperator2.getCurrentElement1()).isNull();
        Assertions.assertThat(streamOperator2.getCurrentElement2()).isNull();
        Assertions.assertThat(streamOperator3.getCurrentElement()).isNull();
        Assertions.assertThat(streamOperator4.getCurrentElement()).isNull();
        Assertions.assertThat(outputData).isEmpty();
        StreamRecord streamRecord = new StreamRecord(GenericRowData.of(new Object[]{StringData.fromString("123")}), 456L);
        input3.processElement(streamRecord);
        Assertions.assertThat(streamOperator.getCurrentElement2()).isEqualTo(streamRecord);
        Assertions.assertThat(streamOperator.getCurrentElement1()).isNull();
        Assertions.assertThat(outputData).isEmpty();
        Assertions.assertThat(streamOperator.getEndInputs()).isEmpty();
        createMultipleInputStreamOperator.endInput(3);
        Assertions.assertThat(outputData).isEmpty();
        Assertions.assertThat(streamOperator.getEndInputs()).containsExactly(new Integer[]{2});
        StreamRecord streamRecord2 = new StreamRecord(GenericRowData.of(new Object[]{StringData.fromString("124")}), 457L);
        input.processElement(streamRecord2);
        Assertions.assertThat(streamOperator3.getCurrentElement()).isEqualTo(streamRecord2);
        Assertions.assertThat(streamOperator2.getCurrentElement1()).isNull();
        Assertions.assertThat(streamOperator.getCurrentElement1()).isNull();
        Assertions.assertThat(outputData).isEmpty();
        Assertions.assertThat(streamOperator2.getEndInputs()).isEmpty();
        createMultipleInputStreamOperator.endInput(1);
        Assertions.assertThat(streamOperator2.getEndInputs()).containsExactly(new Integer[]{1});
        Assertions.assertThat(streamOperator.getEndInputs()).containsExactly(new Integer[]{2});
        Assertions.assertThat(streamOperator2.getCurrentElement1()).isEqualTo(streamRecord2);
        Assertions.assertThat(outputData).isEmpty();
        StreamRecord streamRecord3 = new StreamRecord(GenericRowData.of(new Object[]{StringData.fromString("125")}), 458L);
        input2.processElement(streamRecord3);
        Assertions.assertThat(streamOperator4.getCurrentElement()).isEqualTo(streamRecord3);
        Assertions.assertThat(streamOperator2.getCurrentElement2()).isNull();
        Assertions.assertThat(streamOperator.getCurrentElement1()).isNull();
        Assertions.assertThat(outputData).isEmpty();
        Assertions.assertThat(streamOperator2.getEndInputs()).containsExactly(new Integer[]{1});
        createMultipleInputStreamOperator.endInput(2);
        Assertions.assertThat(streamOperator2.getEndInputs()).isEqualTo(Arrays.asList(1, 2));
        Assertions.assertThat(streamOperator.getEndInputs()).isEqualTo(Arrays.asList(2, 1));
        Assertions.assertThat(streamOperator2.getCurrentElement2()).isEqualTo(streamRecord3);
        Assertions.assertThat(outputData).hasSize(3);
    }

    private TestingBatchMultipleInputStreamOperator createMultipleInputStreamOperator() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation<RowData> createSource = createSource(executionEnvironment, "source1");
        Transformation<RowData> createSource2 = createSource(executionEnvironment, "source2");
        Transformation<RowData> createSource3 = createSource(executionEnvironment, "source3");
        TableOperatorWrapperGenerator tableOperatorWrapperGenerator = new TableOperatorWrapperGenerator(Arrays.asList(createSource, createSource2, createSource3), createTwoInputTransform(createTwoInputTransform(createOneInputTransform(createSource, "agg1", new TestingOneInputStreamOperator(true), InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()}))), createOneInputTransform(createSource2, "agg2", new TestingOneInputStreamOperator(true), InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()}))), "join1", new TestingTwoInputStreamOperator(true), InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()}))), createSource3, "join2", new TestingTwoInputStreamOperator(true), InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()}))), new int[]{1, 2, 0});
        tableOperatorWrapperGenerator.generate();
        List inputTransformAndInputSpecPairs = tableOperatorWrapperGenerator.getInputTransformAndInputSpecPairs();
        ArrayList arrayList = new ArrayList();
        return new TestingBatchMultipleInputStreamOperator(createStreamOperatorParameters(new TestingOutput(arrayList)), (List) inputTransformAndInputSpecPairs.stream().map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList()), tableOperatorWrapperGenerator.getHeadWrappers(), tableOperatorWrapperGenerator.getTailWrapper(), arrayList);
    }
}
