package org.apache.flink.api.java.functions;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.class */
public class SemanticPropertiesTranslationTest {

    @FunctionAnnotation.NonForwardedFieldsSecond({"0;1"})
    @FunctionAnnotation.NonForwardedFieldsFirst({"0;2"})
    /* loaded from: input_file:org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest$AllForwardedExceptJoin.class */
    public static class AllForwardedExceptJoin<X> implements JoinFunction<Tuple3<X, X, X>, Tuple3<X, X, X>, Tuple3<X, X, X>> {
        public Tuple3<X, X, X> join(Tuple3<X, X, X> tuple3, Tuple3<X, X, X> tuple32) throws Exception {
            return null;
        }
    }

    @FunctionAnnotation.NonForwardedFields({"1"})
    /* loaded from: input_file:org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest$AllForwardedExceptMapper.class */
    public static class AllForwardedExceptMapper<T> implements MapFunction<T, T> {
        public T map(T t) {
            return t;
        }
    }

    @FunctionAnnotation.ForwardedFieldsFirst({"1 -> 0"})
    @FunctionAnnotation.ForwardedFieldsSecond({"1 -> 1"})
    /* loaded from: input_file:org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest$ForwardedBothAnnotationJoin.class */
    public static class ForwardedBothAnnotationJoin<A, B, C, D> implements JoinFunction<Tuple2<A, B>, Tuple2<C, D>, Tuple2<B, D>> {
        public Tuple2<B, D> join(Tuple2<A, B> tuple2, Tuple2<C, D> tuple22) {
            return new Tuple2<>(tuple2.f1, tuple22.f1);
        }
    }

    @FunctionAnnotation.ForwardedFieldsFirst({"0->2"})
    /* loaded from: input_file:org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest$ForwardedFirstAnnotationJoin.class */
    public static class ForwardedFirstAnnotationJoin<X> implements JoinFunction<Tuple2<X, X>, Tuple2<X, X>, Tuple3<X, X, X>> {
        public Tuple3<X, X, X> join(Tuple2<X, X> tuple2, Tuple2<X, X> tuple22) throws Exception {
            return null;
        }
    }

    @FunctionAnnotation.ForwardedFieldsSecond({"1->2"})
    /* loaded from: input_file:org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest$ForwardedSecondAnnotationJoin.class */
    public static class ForwardedSecondAnnotationJoin<X> implements JoinFunction<Tuple2<X, X>, Tuple2<X, X>, Tuple3<X, X, X>> {
        public Tuple3<X, X, X> join(Tuple2<X, X> tuple2, Tuple2<X, X> tuple22) throws Exception {
            return null;
        }
    }

    @FunctionAnnotation.ForwardedFields({"0;2"})
    /* loaded from: input_file:org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest$IndividualForwardedMapper.class */
    public static class IndividualForwardedMapper<X, Y, Z> implements MapFunction<Tuple3<X, Y, Z>, Tuple3<X, Y, Z>> {
        public Tuple3<X, Y, Z> map(Tuple3<X, Y, Z> tuple3) {
            return tuple3;
        }
    }

    /* loaded from: input_file:org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest$NoAnnotationJoin.class */
    public static class NoAnnotationJoin<X> implements JoinFunction<Tuple2<X, X>, Tuple2<X, X>, Tuple3<X, X, X>> {
        public Tuple3<X, X, X> join(Tuple2<X, X> tuple2, Tuple2<X, X> tuple22) throws Exception {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest$NoAnnotationMapper.class */
    public static class NoAnnotationMapper<T> implements MapFunction<T, T> {
        public T map(T t) {
            return t;
        }
    }

    @FunctionAnnotation.ReadFieldsFirst({"1"})
    @FunctionAnnotation.ReadFieldsSecond({"0"})
    /* loaded from: input_file:org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest$ReadSetJoin.class */
    public static class ReadSetJoin<X> implements JoinFunction<Tuple2<X, X>, Tuple2<X, X>, Tuple3<X, X, X>> {
        public Tuple3<X, X, X> join(Tuple2<X, X> tuple2, Tuple2<X, X> tuple22) throws Exception {
            return null;
        }
    }

    @FunctionAnnotation.ReadFields({"0;2"})
    /* loaded from: input_file:org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest$ReadSetMapper.class */
    public static class ReadSetMapper<T> implements MapFunction<T, T> {
        public T map(T t) {
            return t;
        }
    }

    @FunctionAnnotation.ForwardedFields({"0->2;1->0;2->1"})
    /* loaded from: input_file:org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest$ShufflingMapper.class */
    public static class ShufflingMapper<X> implements MapFunction<Tuple3<X, X, X>, Tuple3<X, X, X>> {
        public Tuple3<X, X, X> map(Tuple3<X, X, X> tuple3) {
            return tuple3;
        }
    }

    @FunctionAnnotation.ForwardedFields({"*"})
    /* loaded from: input_file:org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest$WildcardForwardedMapper.class */
    public static class WildcardForwardedMapper<T> implements MapFunction<T, T> {
        public T map(T t) {
            return t;
        }
    }

    @Test
    public void testUnaryFunctionWildcardForwardedAnnotation() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple3[]{new Tuple3(3L, "test", 42)}).map(new WildcardForwardedMapper()).output(new DiscardingOutputFormat());
        SingleInputSemanticProperties semanticProperties = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties();
        FieldSet forwardingTargetFields = semanticProperties.getForwardingTargetFields(0, 0);
        FieldSet forwardingTargetFields2 = semanticProperties.getForwardingTargetFields(0, 1);
        FieldSet forwardingTargetFields3 = semanticProperties.getForwardingTargetFields(0, 2);
        Assert.assertNotNull(forwardingTargetFields);
        Assert.assertNotNull(forwardingTargetFields2);
        Assert.assertNotNull(forwardingTargetFields3);
        Assert.assertTrue(forwardingTargetFields.contains(0));
        Assert.assertTrue(forwardingTargetFields2.contains(1));
        Assert.assertTrue(forwardingTargetFields3.contains(2));
    }

    @Test
    public void testUnaryFunctionInPlaceForwardedAnnotation() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple3[]{new Tuple3(3L, "test", 42)}).map(new IndividualForwardedMapper()).output(new DiscardingOutputFormat());
        SingleInputSemanticProperties semanticProperties = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties();
        FieldSet forwardingTargetFields = semanticProperties.getForwardingTargetFields(0, 0);
        FieldSet forwardingTargetFields2 = semanticProperties.getForwardingTargetFields(0, 2);
        Assert.assertNotNull(forwardingTargetFields);
        Assert.assertNotNull(forwardingTargetFields2);
        Assert.assertTrue(forwardingTargetFields.contains(0));
        Assert.assertTrue(forwardingTargetFields2.contains(2));
    }

    @Test
    public void testUnaryFunctionMovingForwardedAnnotation() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple3[]{new Tuple3(3L, 2L, 1L)}).map(new ShufflingMapper()).output(new DiscardingOutputFormat());
        SingleInputSemanticProperties semanticProperties = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties();
        FieldSet forwardingTargetFields = semanticProperties.getForwardingTargetFields(0, 0);
        FieldSet forwardingTargetFields2 = semanticProperties.getForwardingTargetFields(0, 1);
        FieldSet forwardingTargetFields3 = semanticProperties.getForwardingTargetFields(0, 2);
        Assert.assertNotNull(forwardingTargetFields);
        Assert.assertNotNull(forwardingTargetFields2);
        Assert.assertNotNull(forwardingTargetFields3);
        Assert.assertTrue(forwardingTargetFields.contains(2));
        Assert.assertTrue(forwardingTargetFields2.contains(0));
        Assert.assertTrue(forwardingTargetFields3.contains(1));
    }

    @Test
    public void testUnaryFunctionForwardedInLine1() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple3[]{new Tuple3(3L, 2L, 1L)}).map(new NoAnnotationMapper()).withForwardedFields(new String[]{"0->1; 2"}).output(new DiscardingOutputFormat());
        SingleInputSemanticProperties semanticProperties = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties();
        FieldSet forwardingTargetFields = semanticProperties.getForwardingTargetFields(0, 0);
        FieldSet forwardingTargetFields2 = semanticProperties.getForwardingTargetFields(0, 2);
        Assert.assertNotNull(forwardingTargetFields);
        Assert.assertNotNull(forwardingTargetFields2);
        Assert.assertTrue(forwardingTargetFields.contains(1));
        Assert.assertTrue(forwardingTargetFields2.contains(2));
    }

    @Test
    public void testUnaryFunctionForwardedInLine2() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple3[]{new Tuple3(3L, 2L, 1L)}).map(new ReadSetMapper()).withForwardedFields(new String[]{"0->1; 2"}).output(new DiscardingOutputFormat());
        SingleInputSemanticProperties semanticProperties = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties();
        FieldSet forwardingTargetFields = semanticProperties.getForwardingTargetFields(0, 0);
        FieldSet forwardingTargetFields2 = semanticProperties.getForwardingTargetFields(0, 2);
        Assert.assertNotNull(forwardingTargetFields);
        Assert.assertNotNull(forwardingTargetFields2);
        Assert.assertTrue(forwardingTargetFields.contains(1));
        Assert.assertTrue(forwardingTargetFields2.contains(2));
    }

    @Test
    public void testUnaryFunctionForwardedInLine3() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple3[]{new Tuple3(3L, 2L, 1L)}).map(new ReadSetMapper()).withForwardedFields(new String[]{"0->1; 2"}).output(new DiscardingOutputFormat());
        SingleInputSemanticProperties semanticProperties = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties();
        FieldSet forwardingTargetFields = semanticProperties.getForwardingTargetFields(0, 0);
        FieldSet forwardingTargetFields2 = semanticProperties.getForwardingTargetFields(0, 2);
        Assert.assertNotNull(forwardingTargetFields);
        Assert.assertNotNull(forwardingTargetFields2);
        Assert.assertTrue(forwardingTargetFields.contains(1));
        Assert.assertTrue(forwardingTargetFields2.contains(2));
    }

    @Test
    public void testUnaryFunctionAllForwardedExceptAnnotation() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple3[]{new Tuple3(3L, 2L, 1L)}).map(new AllForwardedExceptMapper()).output(new DiscardingOutputFormat());
        SingleInputSemanticProperties semanticProperties = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties();
        FieldSet forwardingTargetFields = semanticProperties.getForwardingTargetFields(0, 0);
        FieldSet forwardingTargetFields2 = semanticProperties.getForwardingTargetFields(0, 2);
        Assert.assertNotNull(forwardingTargetFields);
        Assert.assertNotNull(forwardingTargetFields2);
        Assert.assertTrue(forwardingTargetFields.contains(0));
        Assert.assertTrue(forwardingTargetFields2.contains(2));
    }

    @Test
    public void testUnaryFunctionReadFieldsAnnotation() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple3[]{new Tuple3(3L, 2L, 1L)}).map(new ReadSetMapper()).output(new DiscardingOutputFormat());
        FieldSet readFields = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties().getReadFields(0);
        Assert.assertNotNull(readFields);
        Assert.assertEquals(2L, readFields.size());
        Assert.assertTrue(readFields.contains(0));
        Assert.assertTrue(readFields.contains(2));
    }

    @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
    public void testUnaryForwardedOverwritingInLine1() {
        ExecutionEnvironment.getExecutionEnvironment().fromElements(new Tuple3[]{new Tuple3(3L, 2L, 1L)}).map(new WildcardForwardedMapper()).withForwardedFields(new String[]{"0->1; 2"});
    }

    @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
    public void testUnaryForwardedOverwritingInLine2() {
        ExecutionEnvironment.getExecutionEnvironment().fromElements(new Tuple3[]{new Tuple3(3L, 2L, 1L)}).map(new AllForwardedExceptMapper()).withForwardedFields(new String[]{"0->1; 2"});
    }

    @Test
    public void testBinaryForwardedAnnotation() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, "test")}).join(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, Double.valueOf(3.1415d))})).where(new int[]{0}).equalTo(new int[]{0}).with(new ForwardedBothAnnotationJoin()).output(new DiscardingOutputFormat());
        DualInputSemanticProperties semanticProperties = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties();
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(0, 0));
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(1, 0));
        Assert.assertEquals(1L, semanticProperties.getForwardingTargetFields(0, 1).size());
        Assert.assertEquals(1L, semanticProperties.getForwardingTargetFields(1, 1).size());
        Assert.assertTrue(semanticProperties.getForwardingTargetFields(0, 1).contains(0));
        Assert.assertTrue(semanticProperties.getForwardingTargetFields(1, 1).contains(1));
        Assert.assertEquals(0L, semanticProperties.getForwardingTargetFields(0, 0).size());
        Assert.assertEquals(0L, semanticProperties.getForwardingTargetFields(1, 0).size());
    }

    @Test
    public void testBinaryForwardedInLine1() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 4L)}).join(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 2L)})).where(new int[]{0}).equalTo(new int[]{0}).with(new NoAnnotationJoin()).withForwardedFieldsFirst(new String[]{"0->1; 1->2"}).withForwardedFieldsSecond(new String[]{"1->0"}).output(new DiscardingOutputFormat());
        DualInputSemanticProperties semanticProperties = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties();
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(1, 0));
        Assert.assertEquals(1L, semanticProperties.getForwardingTargetFields(0, 0).size());
        Assert.assertEquals(1L, semanticProperties.getForwardingTargetFields(0, 1).size());
        Assert.assertEquals(1L, semanticProperties.getForwardingTargetFields(1, 1).size());
        Assert.assertTrue(semanticProperties.getForwardingTargetFields(0, 0).contains(1));
        Assert.assertTrue(semanticProperties.getForwardingTargetFields(0, 1).contains(2));
        Assert.assertTrue(semanticProperties.getForwardingTargetFields(1, 1).contains(0));
        Assert.assertEquals(0L, semanticProperties.getForwardingTargetFields(1, 0).size());
    }

    @Test
    public void testBinaryForwardedInLine2() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 4L)}).join(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 2L)})).where(new int[]{0}).equalTo(new int[]{0}).with(new ReadSetJoin()).withForwardedFieldsFirst(new String[]{"0->1; 1->2"}).withForwardedFieldsSecond(new String[]{"1->0"}).output(new DiscardingOutputFormat());
        DualInputSemanticProperties semanticProperties = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties();
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(1, 0));
        Assert.assertEquals(1L, semanticProperties.getForwardingTargetFields(0, 0).size());
        Assert.assertEquals(1L, semanticProperties.getForwardingTargetFields(0, 1).size());
        Assert.assertEquals(1L, semanticProperties.getForwardingTargetFields(1, 1).size());
        Assert.assertTrue(semanticProperties.getForwardingTargetFields(0, 0).contains(1));
        Assert.assertTrue(semanticProperties.getForwardingTargetFields(0, 1).contains(2));
        Assert.assertTrue(semanticProperties.getForwardingTargetFields(1, 1).contains(0));
        Assert.assertNotNull(semanticProperties.getReadFields(0));
        Assert.assertNotNull(semanticProperties.getReadFields(1));
        Assert.assertEquals(1L, semanticProperties.getReadFields(0).size());
        Assert.assertEquals(1L, semanticProperties.getReadFields(1).size());
        Assert.assertTrue(semanticProperties.getReadFields(0).contains(1));
        Assert.assertTrue(semanticProperties.getReadFields(1).contains(0));
        Assert.assertEquals(0L, semanticProperties.getForwardingTargetFields(1, 0).size());
    }

    @Test
    public void testBinaryForwardedAnnotationInLineMixed1() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 4L)}).join(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 2L)})).where(new int[]{0}).equalTo(new int[]{0}).with(new ForwardedFirstAnnotationJoin()).withForwardedFieldsSecond(new String[]{"1"}).output(new DiscardingOutputFormat());
        DualInputSemanticProperties semanticProperties = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties();
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(0, 1));
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(1, 0));
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(0, 0));
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(1, 1));
        Assert.assertEquals(1L, semanticProperties.getForwardingTargetFields(0, 0).size());
        Assert.assertEquals(1L, semanticProperties.getForwardingTargetFields(1, 1).size());
        Assert.assertTrue(semanticProperties.getForwardingTargetFields(0, 0).contains(2));
        Assert.assertTrue(semanticProperties.getForwardingTargetFields(1, 1).contains(1));
        Assert.assertEquals(0L, semanticProperties.getForwardingTargetFields(0, 1).size());
        Assert.assertEquals(0L, semanticProperties.getForwardingTargetFields(1, 0).size());
    }

    @Test
    public void testBinaryForwardedAnnotationInLineMixed2() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 4L)}).join(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 2L)})).where(new int[]{0}).equalTo(new int[]{0}).with(new ForwardedSecondAnnotationJoin()).withForwardedFieldsFirst(new String[]{"0->1"}).output(new DiscardingOutputFormat());
        DualInputSemanticProperties semanticProperties = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties();
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(0, 1));
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(1, 0));
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(0, 0));
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(1, 1));
        Assert.assertEquals(1L, semanticProperties.getForwardingTargetFields(0, 0).size());
        Assert.assertEquals(1L, semanticProperties.getForwardingTargetFields(1, 1).size());
        Assert.assertTrue(semanticProperties.getForwardingTargetFields(0, 0).contains(1));
        Assert.assertTrue(semanticProperties.getForwardingTargetFields(1, 1).contains(2));
        Assert.assertEquals(0L, semanticProperties.getForwardingTargetFields(0, 1).size());
        Assert.assertEquals(0L, semanticProperties.getForwardingTargetFields(1, 0).size());
    }

    @Test
    public void testBinaryAllForwardedExceptAnnotation() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple3[]{new Tuple3(3L, 4L, 5L)}).join(executionEnvironment.fromElements(new Tuple3[]{new Tuple3(3L, 2L, 1L)})).where(new int[]{0}).equalTo(new int[]{0}).with(new AllForwardedExceptJoin()).output(new DiscardingOutputFormat());
        DualInputSemanticProperties semanticProperties = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties();
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(0, 0));
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(0, 2));
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(1, 0));
        Assert.assertNotNull(semanticProperties.getForwardingTargetFields(1, 1));
        Assert.assertEquals(1L, semanticProperties.getForwardingTargetFields(0, 1).size());
        Assert.assertEquals(1L, semanticProperties.getForwardingTargetFields(1, 2).size());
        Assert.assertTrue(semanticProperties.getForwardingTargetFields(0, 1).contains(1));
        Assert.assertTrue(semanticProperties.getForwardingTargetFields(1, 2).contains(2));
        Assert.assertEquals(0L, semanticProperties.getForwardingTargetFields(0, 0).size());
        Assert.assertEquals(0L, semanticProperties.getForwardingTargetFields(0, 2).size());
        Assert.assertEquals(0L, semanticProperties.getForwardingTargetFields(1, 0).size());
        Assert.assertEquals(0L, semanticProperties.getForwardingTargetFields(1, 1).size());
    }

    @Test
    public void testBinaryReadFieldsAnnotation() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 4L)}).join(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 2L)})).where(new int[]{0}).equalTo(new int[]{0}).with(new ReadSetJoin()).output(new DiscardingOutputFormat());
        DualInputSemanticProperties semanticProperties = ((GenericDataSinkBase) executionEnvironment.createProgramPlan().getDataSinks().iterator().next()).getInput().getSemanticProperties();
        Assert.assertNotNull(semanticProperties.getReadFields(0));
        Assert.assertNotNull(semanticProperties.getReadFields(1));
        Assert.assertEquals(1L, semanticProperties.getReadFields(0).size());
        Assert.assertEquals(1L, semanticProperties.getReadFields(1).size());
        Assert.assertTrue(semanticProperties.getReadFields(0).contains(1));
        Assert.assertTrue(semanticProperties.getReadFields(1).contains(0));
    }

    @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
    public void testBinaryForwardedOverwritingInLine1() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 4L)}).join(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 2L)})).where(new int[]{0}).equalTo(new int[]{0}).with(new ForwardedFirstAnnotationJoin()).withForwardedFieldsFirst(new String[]{"0->1"});
    }

    @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
    public void testBinaryForwardedOverwritingInLine2() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 4L)}).join(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 2L)})).where(new int[]{0}).equalTo(new int[]{0}).with(new ForwardedSecondAnnotationJoin()).withForwardedFieldsSecond(new String[]{"0->1"});
    }

    @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
    public void testBinaryForwardedOverwritingInLine3() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 4L)}).join(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 2L)})).where(new int[]{0}).equalTo(new int[]{0}).with(new ForwardedBothAnnotationJoin()).withForwardedFieldsFirst(new String[]{"0->1;"});
    }

    @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
    public void testBinaryForwardedOverwritingInLine4() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 4L)}).join(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(3L, 2L)})).where(new int[]{0}).equalTo(new int[]{0}).with(new ForwardedBothAnnotationJoin()).withForwardedFieldsSecond(new String[]{"0->1;"});
    }

    @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
    public void testBinaryForwardedOverwritingInLine5() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple3[]{new Tuple3(3L, 4L, 5L)}).join(executionEnvironment.fromElements(new Tuple3[]{new Tuple3(3L, 2L, 1L)})).where(new int[]{0}).equalTo(new int[]{0}).with(new AllForwardedExceptJoin()).withForwardedFieldsFirst(new String[]{"0->1;"});
    }

    @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
    public void testBinaryForwardedOverwritingInLine6() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Tuple3[]{new Tuple3(3L, 4L, 5L)}).join(executionEnvironment.fromElements(new Tuple3[]{new Tuple3(3L, 2L, 1L)})).where(new int[]{0}).equalTo(new int[]{0}).with(new AllForwardedExceptJoin()).withForwardedFieldsSecond(new String[]{"0->1;"});
    }
}
