package org.apache.flink.table.runtime.operators.multipleinput.output;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.operators.multipleinput.MultipleInputTestBase;
import org.apache.flink.table.runtime.operators.multipleinput.TestingOneInputStreamOperator;
import org.apache.flink.table.runtime.operators.multipleinput.TestingTwoInputStreamOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/multipleinput/output/OutputTest.class */
public class OutputTest extends MultipleInputTestBase {
    private StreamRecord<RowData> element;
    private Watermark watermark;
    private LatencyMarker latencyMarker;
    private TypeSerializer<RowData> serializer;

    @Before
    public void setup() {
        this.element = new StreamRecord<>(GenericRowData.of(new Object[]{StringData.fromString("123")}), 456L);
        this.watermark = new Watermark(1223456789L);
        this.latencyMarker = new LatencyMarker(122345678L, new OperatorID(123L, 456L), 1);
        this.serializer = InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()})).createSerializer(new ExecutionConfig());
    }

    @Test
    public void testOneInput() throws Exception {
        TestingOneInputStreamOperator createOneInputStreamOperator = createOneInputStreamOperator();
        OneInputStreamOperatorOutput oneInputStreamOperatorOutput = new OneInputStreamOperatorOutput(createOneInputStreamOperator);
        oneInputStreamOperatorOutput.collect(this.element);
        Assertions.assertThat(createOneInputStreamOperator.getCurrentElement()).isEqualTo(this.element);
        oneInputStreamOperatorOutput.emitWatermark(this.watermark);
        Assertions.assertThat(createOneInputStreamOperator.getCurrentWatermark()).isEqualTo(this.watermark);
        oneInputStreamOperatorOutput.emitLatencyMarker(this.latencyMarker);
        Assertions.assertThat(createOneInputStreamOperator.getCurrentLatencyMarker()).isEqualTo(this.latencyMarker);
    }

    @Test
    public void testCopyingOneInput() throws Exception {
        TestingOneInputStreamOperator createOneInputStreamOperator = createOneInputStreamOperator();
        CopyingOneInputStreamOperatorOutput copyingOneInputStreamOperatorOutput = new CopyingOneInputStreamOperatorOutput(createOneInputStreamOperator, this.serializer);
        copyingOneInputStreamOperatorOutput.collect(this.element);
        Assertions.assertThat(createOneInputStreamOperator.getCurrentElement()).isNotSameAs(this.element);
        Assertions.assertThat(createOneInputStreamOperator.getCurrentElement()).isEqualTo(this.element);
        copyingOneInputStreamOperatorOutput.emitWatermark(this.watermark);
        Assertions.assertThat(createOneInputStreamOperator.getCurrentWatermark()).isSameAs(this.watermark);
        copyingOneInputStreamOperatorOutput.emitLatencyMarker(this.latencyMarker);
        Assertions.assertThat(createOneInputStreamOperator.getCurrentLatencyMarker()).isSameAs(this.latencyMarker);
    }

    @Test
    public void testFirstInputOfTwoInput() throws Exception {
        TestingTwoInputStreamOperator createTwoInputStreamOperator = createTwoInputStreamOperator();
        FirstInputOfTwoInputStreamOperatorOutput firstInputOfTwoInputStreamOperatorOutput = new FirstInputOfTwoInputStreamOperatorOutput(createTwoInputStreamOperator);
        firstInputOfTwoInputStreamOperatorOutput.collect(this.element);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentElement1()).isEqualTo(this.element);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentElement2()).isNull();
        firstInputOfTwoInputStreamOperatorOutput.emitWatermark(this.watermark);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentWatermark1()).isEqualTo(this.watermark);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentWatermark2()).isNull();
        firstInputOfTwoInputStreamOperatorOutput.emitLatencyMarker(this.latencyMarker);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentLatencyMarker1()).isEqualTo(this.latencyMarker);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentLatencyMarker2()).isNull();
    }

    @Test
    public void testCopyingFirstInputOfTwoInput() throws Exception {
        TestingTwoInputStreamOperator createTwoInputStreamOperator = createTwoInputStreamOperator();
        CopyingFirstInputOfTwoInputStreamOperatorOutput copyingFirstInputOfTwoInputStreamOperatorOutput = new CopyingFirstInputOfTwoInputStreamOperatorOutput(createTwoInputStreamOperator, this.serializer);
        copyingFirstInputOfTwoInputStreamOperatorOutput.collect(this.element);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentElement1()).isNotSameAs(this.element);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentElement1()).isEqualTo(this.element);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentElement2()).isNull();
        copyingFirstInputOfTwoInputStreamOperatorOutput.emitWatermark(this.watermark);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentWatermark1()).isSameAs(this.watermark);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentWatermark2()).isNull();
        copyingFirstInputOfTwoInputStreamOperatorOutput.emitLatencyMarker(this.latencyMarker);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentLatencyMarker1()).isSameAs(this.latencyMarker);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentLatencyMarker2()).isNull();
    }

    @Test
    public void testSecondInputOfTwoInput() throws Exception {
        TestingTwoInputStreamOperator createTwoInputStreamOperator = createTwoInputStreamOperator();
        SecondInputOfTwoInputStreamOperatorOutput secondInputOfTwoInputStreamOperatorOutput = new SecondInputOfTwoInputStreamOperatorOutput(createTwoInputStreamOperator);
        secondInputOfTwoInputStreamOperatorOutput.collect(this.element);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentElement2()).isEqualTo(this.element);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentElement1()).isNull();
        secondInputOfTwoInputStreamOperatorOutput.emitWatermark(this.watermark);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentWatermark2()).isEqualTo(this.watermark);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentWatermark1()).isNull();
        secondInputOfTwoInputStreamOperatorOutput.emitLatencyMarker(this.latencyMarker);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentLatencyMarker2()).isEqualTo(this.latencyMarker);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentLatencyMarker1()).isNull();
    }

    @Test
    public void testCopyingSecondInputOfTwoInput() throws Exception {
        TestingTwoInputStreamOperator createTwoInputStreamOperator = createTwoInputStreamOperator();
        CopyingSecondInputOfTwoInputStreamOperatorOutput copyingSecondInputOfTwoInputStreamOperatorOutput = new CopyingSecondInputOfTwoInputStreamOperatorOutput(createTwoInputStreamOperator, this.serializer);
        copyingSecondInputOfTwoInputStreamOperatorOutput.collect(this.element);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentElement2()).isNotSameAs(this.element);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentElement2()).isEqualTo(this.element);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentElement1()).isNull();
        copyingSecondInputOfTwoInputStreamOperatorOutput.emitWatermark(this.watermark);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentWatermark2()).isSameAs(this.watermark);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentWatermark1()).isNull();
        copyingSecondInputOfTwoInputStreamOperatorOutput.emitLatencyMarker(this.latencyMarker);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentLatencyMarker2()).isSameAs(this.latencyMarker);
        Assertions.assertThat(createTwoInputStreamOperator.getCurrentLatencyMarker1()).isNull();
    }

    @Test
    public void testBroadcasting() throws Exception {
        TestingOneInputStreamOperator createOneInputStreamOperator = createOneInputStreamOperator();
        TestingOneInputStreamOperator createOneInputStreamOperator2 = createOneInputStreamOperator();
        BroadcastingOutput broadcastingOutput = new BroadcastingOutput(new Output[]{new OneInputStreamOperatorOutput(createOneInputStreamOperator), new OneInputStreamOperatorOutput(createOneInputStreamOperator2)});
        broadcastingOutput.collect(this.element);
        Assertions.assertThat(createOneInputStreamOperator.getCurrentElement()).isEqualTo(this.element);
        Assertions.assertThat(createOneInputStreamOperator2.getCurrentElement()).isEqualTo(this.element);
        broadcastingOutput.emitWatermark(this.watermark);
        Assertions.assertThat(createOneInputStreamOperator.getCurrentWatermark()).isEqualTo(this.watermark);
        Assertions.assertThat(createOneInputStreamOperator2.getCurrentWatermark()).isEqualTo(this.watermark);
        broadcastingOutput.emitLatencyMarker(this.latencyMarker);
        if (createOneInputStreamOperator.getCurrentLatencyMarker() == null) {
            Assertions.assertThat(createOneInputStreamOperator2.getCurrentLatencyMarker()).isEqualTo(this.latencyMarker);
        } else {
            Assertions.assertThat(createOneInputStreamOperator.getCurrentLatencyMarker()).isEqualTo(this.latencyMarker);
            Assertions.assertThat(createOneInputStreamOperator2.getCurrentLatencyMarker()).isNull();
        }
    }

    @Test
    public void testCopyingBroadcasting() throws Exception {
        TestingOneInputStreamOperator createOneInputStreamOperator = createOneInputStreamOperator();
        TestingOneInputStreamOperator createOneInputStreamOperator2 = createOneInputStreamOperator();
        CopyingBroadcastingOutput copyingBroadcastingOutput = new CopyingBroadcastingOutput(new Output[]{new OneInputStreamOperatorOutput(createOneInputStreamOperator), new OneInputStreamOperatorOutput(createOneInputStreamOperator2)});
        copyingBroadcastingOutput.collect(this.element);
        Assertions.assertThat(createOneInputStreamOperator.getCurrentElement()).isNotSameAs(this.element);
        Assertions.assertThat(createOneInputStreamOperator.getCurrentElement()).isEqualTo(this.element);
        Assertions.assertThat(createOneInputStreamOperator2.getCurrentElement()).isSameAs(this.element);
        copyingBroadcastingOutput.emitWatermark(this.watermark);
        Assertions.assertThat(createOneInputStreamOperator.getCurrentWatermark()).isSameAs(this.watermark);
        Assertions.assertThat(createOneInputStreamOperator2.getCurrentWatermark()).isSameAs(this.watermark);
        copyingBroadcastingOutput.emitLatencyMarker(this.latencyMarker);
        if (createOneInputStreamOperator.getCurrentLatencyMarker() == null) {
            Assertions.assertThat(createOneInputStreamOperator2.getCurrentLatencyMarker()).isSameAs(this.latencyMarker);
        } else {
            Assertions.assertThat(createOneInputStreamOperator.getCurrentLatencyMarker()).isSameAs(this.latencyMarker);
            Assertions.assertThat(createOneInputStreamOperator2.getCurrentLatencyMarker()).isNull();
        }
    }
}
