/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.multipleinput;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator;
import org.apache.flink.table.runtime.operators.multipleinput.MultipleInputTestBase;
import org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper;
import org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapperGenerator;
import org.apache.flink.table.runtime.operators.multipleinput.TestingOneInputStreamOperator;
import org.apache.flink.table.runtime.operators.multipleinput.TestingTwoInputStreamOperator;
import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
import org.apache.flink.table.runtime.operators.multipleinput.input.OneInput;
import org.apache.flink.table.runtime.operators.multipleinput.input.SecondInputOfTwoInput;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.Assert;
import org.junit.Test;

public class BatchMultipleInputStreamOperatorTest
extends MultipleInputTestBase {
    @Test
    public void testOpen() throws Exception {
        TestingBatchMultipleInputStreamOperator op = this.createMultipleInputStreamOperator();
        TestingTwoInputStreamOperator joinOp2 = (TestingTwoInputStreamOperator)op.getTailWrapper().getStreamOperator();
        TableOperatorWrapper joinWrapper1 = (TableOperatorWrapper)op.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator joinOp1 = (TestingTwoInputStreamOperator)joinWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper1 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(0);
        TestingOneInputStreamOperator aggOp1 = (TestingOneInputStreamOperator)aggWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper2 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(1);
        TestingOneInputStreamOperator aggOp2 = (TestingOneInputStreamOperator)aggWrapper2.getStreamOperator();
        Assert.assertFalse((boolean)aggOp1.isOpened());
        Assert.assertFalse((boolean)aggOp2.isOpened());
        Assert.assertFalse((boolean)aggOp1.isOpened());
        Assert.assertFalse((boolean)joinOp2.isOpened());
        op.open();
        Assert.assertTrue((boolean)aggOp1.isOpened());
        Assert.assertTrue((boolean)aggOp2.isOpened());
        Assert.assertTrue((boolean)joinOp1.isOpened());
        Assert.assertTrue((boolean)joinOp2.isOpened());
    }

    @Test
    public void testNextSelectionAndEndInput() throws Exception {
        TestingBatchMultipleInputStreamOperator op = this.createMultipleInputStreamOperator();
        TestingTwoInputStreamOperator joinOp2 = (TestingTwoInputStreamOperator)op.getTailWrapper().getStreamOperator();
        TableOperatorWrapper joinWrapper1 = (TableOperatorWrapper)op.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator joinOp1 = (TestingTwoInputStreamOperator)joinWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper1 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(0);
        TestingOneInputStreamOperator aggOp1 = (TestingOneInputStreamOperator)aggWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper2 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(1);
        TestingOneInputStreamOperator aggOp2 = (TestingOneInputStreamOperator)aggWrapper2.getStreamOperator();
        Assert.assertFalse((boolean)aggOp1.isEnd());
        Assert.assertFalse((boolean)aggOp2.isEnd());
        Assert.assertTrue((boolean)joinOp1.getEndInputs().isEmpty());
        Assert.assertTrue((boolean)joinOp2.getEndInputs().isEmpty());
        Assert.assertEquals((Object)new InputSelection.Builder().select(3).build(3), (Object)op.nextSelection());
        op.endInput(3);
        Assert.assertFalse((boolean)aggOp1.isEnd());
        Assert.assertFalse((boolean)aggOp2.isEnd());
        Assert.assertTrue((boolean)joinOp1.getEndInputs().isEmpty());
        Assert.assertEquals(Collections.singletonList(2), joinOp2.getEndInputs());
        Assert.assertEquals((Object)new InputSelection.Builder().select(1).build(3), (Object)op.nextSelection());
        op.endInput(1);
        Assert.assertTrue((boolean)aggOp1.isEnd());
        Assert.assertFalse((boolean)aggOp2.isEnd());
        Assert.assertEquals(Collections.singletonList(1), joinOp1.getEndInputs());
        Assert.assertEquals(Collections.singletonList(2), joinOp2.getEndInputs());
        Assert.assertEquals((Object)new InputSelection.Builder().select(2).build(3), (Object)op.nextSelection());
        op.endInput(2);
        Assert.assertTrue((boolean)aggOp1.isEnd());
        Assert.assertTrue((boolean)aggOp2.isEnd());
        Assert.assertEquals(Arrays.asList(1, 2), joinOp1.getEndInputs());
        Assert.assertEquals(Arrays.asList(2, 1), joinOp2.getEndInputs());
        Assert.assertEquals((Object)InputSelection.ALL, (Object)op.nextSelection());
    }

    @Test
    public void testDispose() throws Exception {
        TestingBatchMultipleInputStreamOperator op = this.createMultipleInputStreamOperator();
        TestingTwoInputStreamOperator joinOp2 = (TestingTwoInputStreamOperator)op.getTailWrapper().getStreamOperator();
        TableOperatorWrapper joinWrapper1 = (TableOperatorWrapper)op.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator joinOp1 = (TestingTwoInputStreamOperator)joinWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper1 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(0);
        TestingOneInputStreamOperator aggOp1 = (TestingOneInputStreamOperator)aggWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper2 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(1);
        TestingOneInputStreamOperator aggOp2 = (TestingOneInputStreamOperator)aggWrapper2.getStreamOperator();
        Assert.assertFalse((boolean)aggOp1.isDisposed());
        Assert.assertFalse((boolean)aggOp2.isDisposed());
        Assert.assertFalse((boolean)aggOp1.isDisposed());
        Assert.assertFalse((boolean)joinOp2.isDisposed());
        op.dispose();
        Assert.assertTrue((boolean)aggOp1.isDisposed());
        Assert.assertTrue((boolean)aggOp2.isDisposed());
        Assert.assertTrue((boolean)joinOp1.isDisposed());
        Assert.assertTrue((boolean)joinOp2.isDisposed());
    }

    @Test
    public void testClose() throws Exception {
        TestingBatchMultipleInputStreamOperator op = this.createMultipleInputStreamOperator();
        TestingTwoInputStreamOperator joinOp2 = (TestingTwoInputStreamOperator)op.getTailWrapper().getStreamOperator();
        TableOperatorWrapper joinWrapper1 = (TableOperatorWrapper)op.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator joinOp1 = (TestingTwoInputStreamOperator)joinWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper1 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(0);
        TestingOneInputStreamOperator aggOp1 = (TestingOneInputStreamOperator)aggWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper2 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(1);
        TestingOneInputStreamOperator aggOp2 = (TestingOneInputStreamOperator)aggWrapper2.getStreamOperator();
        Assert.assertFalse((boolean)aggOp1.isClosed());
        Assert.assertFalse((boolean)aggOp2.isClosed());
        Assert.assertFalse((boolean)aggOp1.isClosed());
        Assert.assertFalse((boolean)joinOp2.isClosed());
        op.close();
        Assert.assertTrue((boolean)aggOp1.isClosed());
        Assert.assertTrue((boolean)aggOp2.isClosed());
        Assert.assertTrue((boolean)joinOp1.isClosed());
        Assert.assertTrue((boolean)joinOp2.isClosed());
    }

    @Test
    public void testProcess() throws Exception {
        TestingBatchMultipleInputStreamOperator op = this.createMultipleInputStreamOperator();
        List<StreamElement> outputData = op.getOutputData();
        TestingTwoInputStreamOperator joinOp2 = (TestingTwoInputStreamOperator)op.getTailWrapper().getStreamOperator();
        TableOperatorWrapper joinWrapper1 = (TableOperatorWrapper)op.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator joinOp1 = (TestingTwoInputStreamOperator)joinWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper1 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(0);
        TestingOneInputStreamOperator aggOp1 = (TestingOneInputStreamOperator)aggWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper2 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(1);
        TestingOneInputStreamOperator aggOp2 = (TestingOneInputStreamOperator)aggWrapper2.getStreamOperator();
        List inputs = op.getInputs();
        Assert.assertEquals((long)3L, (long)inputs.size());
        Input input1 = (Input)inputs.get(0);
        Input input2 = (Input)inputs.get(1);
        Input input3 = (Input)inputs.get(2);
        Assert.assertTrue((boolean)(input1 instanceof OneInput));
        Assert.assertTrue((boolean)(input2 instanceof OneInput));
        Assert.assertTrue((boolean)(input3 instanceof SecondInputOfTwoInput));
        Assert.assertNull(joinOp2.getCurrentElement1());
        Assert.assertNull(joinOp2.getCurrentElement2());
        Assert.assertNull(joinOp1.getCurrentElement1());
        Assert.assertNull(joinOp1.getCurrentElement2());
        Assert.assertNull(aggOp1.getCurrentElement());
        Assert.assertNull(aggOp2.getCurrentElement());
        Assert.assertTrue((boolean)outputData.isEmpty());
        StreamRecord element1 = new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"123")}), 456L);
        input3.processElement(element1);
        Assert.assertEquals((Object)element1, joinOp2.getCurrentElement2());
        Assert.assertNull(joinOp2.getCurrentElement1());
        Assert.assertTrue((boolean)outputData.isEmpty());
        Assert.assertTrue((boolean)joinOp2.getEndInputs().isEmpty());
        op.endInput(3);
        Assert.assertTrue((boolean)outputData.isEmpty());
        Assert.assertEquals(Collections.singletonList(2), joinOp2.getEndInputs());
        StreamRecord element2 = new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"124")}), 457L);
        input1.processElement(element2);
        Assert.assertEquals((Object)element2, aggOp1.getCurrentElement());
        Assert.assertNull(joinOp1.getCurrentElement1());
        Assert.assertNull(joinOp2.getCurrentElement1());
        Assert.assertTrue((boolean)outputData.isEmpty());
        Assert.assertTrue((boolean)joinOp1.getEndInputs().isEmpty());
        op.endInput(1);
        Assert.assertEquals(Collections.singletonList(1), joinOp1.getEndInputs());
        Assert.assertEquals(Collections.singletonList(2), joinOp2.getEndInputs());
        Assert.assertEquals((Object)element2, joinOp1.getCurrentElement1());
        Assert.assertTrue((boolean)outputData.isEmpty());
        StreamRecord element3 = new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"125")}), 458L);
        input2.processElement(element3);
        Assert.assertEquals((Object)element3, aggOp2.getCurrentElement());
        Assert.assertNull(joinOp1.getCurrentElement2());
        Assert.assertNull(joinOp2.getCurrentElement1());
        Assert.assertTrue((boolean)outputData.isEmpty());
        Assert.assertEquals(Collections.singletonList(1), joinOp1.getEndInputs());
        op.endInput(2);
        Assert.assertEquals(Arrays.asList(1, 2), joinOp1.getEndInputs());
        Assert.assertEquals(Arrays.asList(2, 1), joinOp2.getEndInputs());
        Assert.assertEquals((Object)element3, joinOp1.getCurrentElement2());
        Assert.assertEquals((long)3L, (long)outputData.size());
    }

    private TestingBatchMultipleInputStreamOperator createMultipleInputStreamOperator() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation<RowData> source1 = this.createSource(env, "source1");
        Transformation<RowData> source2 = this.createSource(env, "source2");
        Transformation<RowData> source3 = this.createSource(env, "source3");
        OneInputTransformation<RowData, RowData> agg1 = this.createOneInputTransform(source1, "agg1", new TestingOneInputStreamOperator(true), (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        OneInputTransformation<RowData, RowData> agg2 = this.createOneInputTransform(source2, "agg2", new TestingOneInputStreamOperator(true), (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        TwoInputTransformation<RowData, RowData, RowData> join1 = this.createTwoInputTransform((Transformation<RowData>)agg1, (Transformation<RowData>)agg2, "join1", new TestingTwoInputStreamOperator(true), (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        TwoInputTransformation<RowData, RowData, RowData> join2 = this.createTwoInputTransform((Transformation<RowData>)join1, source3, "join2", new TestingTwoInputStreamOperator(true), (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        TableOperatorWrapperGenerator generator = new TableOperatorWrapperGenerator(Arrays.asList(source1, source2, source3), join2, new int[]{1, 2, 0});
        generator.generate();
        List inputTransformAndInputSpecPairs = generator.getInputTransformAndInputSpecPairs();
        ArrayList<StreamElement> outputData = new ArrayList<StreamElement>();
        return new TestingBatchMultipleInputStreamOperator(this.createStreamOperatorParameters(new TestingOutput(outputData)), inputTransformAndInputSpecPairs.stream().map(Pair::getValue).collect(Collectors.toList()), generator.getHeadWrappers(), generator.getTailWrapper(), outputData);
    }

    private static class TestingOutput
    extends CollectorOutput<RowData> {
        private final List<StreamElement> list;

        public TestingOutput(List<StreamElement> list) {
            super(list);
            this.list = list;
        }

        public void collect(StreamRecord<RowData> record) {
            this.list.add((StreamElement)record);
        }
    }

    private static class TestingBatchMultipleInputStreamOperator
    extends BatchMultipleInputStreamOperator {
        private final TableOperatorWrapper<?> tailWrapper;
        private final List<StreamElement> outputData;

        public TestingBatchMultipleInputStreamOperator(StreamOperatorParameters<RowData> parameters, List<InputSpec> inputSpecs, List<TableOperatorWrapper<?>> headWrapper, TableOperatorWrapper<?> tailWrapper, List<StreamElement> outputData) {
            super(parameters, inputSpecs, headWrapper, tailWrapper);
            this.tailWrapper = tailWrapper;
            this.outputData = outputData;
        }

        public List<StreamElement> getOutputData() {
            return this.outputData;
        }

        public TableOperatorWrapper<?> getTailWrapper() {
            return this.tailWrapper;
        }
    }
}

