/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.javaApiOperators;

import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.FilterOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class FilterITCase
extends MultipleProgramsTestBase {
    private String resultPath;
    private String expected;
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    public FilterITCase(MultipleProgramsTestBase.TestExecutionMode mode) {
        super(mode);
    }

    @Before
    public void before() throws Exception {
        this.resultPath = this.tempFolder.newFile().toURI().toString();
    }

    @After
    public void after() throws Exception {
        FilterITCase.compareResultsByLinesInMemory((String)this.expected, (String)this.resultPath);
    }

    @Test
    public void testAllRejectingFilter() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        FilterOperator filterDs = ds.filter((FilterFunction)new Filter1());
        filterDs.writeAsCsv(this.resultPath);
        env.execute();
        this.expected = "\n";
    }

    @Test
    public void testAllPassingFilter() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        FilterOperator filterDs = ds.filter((FilterFunction)new Filter2());
        filterDs.writeAsCsv(this.resultPath);
        env.execute();
        this.expected = "1,1,Hi\n2,2,Hello\n3,2,Hello world\n4,3,Hello world, how are you?\n5,3,I am fine.\n6,3,Luke Skywalker\n7,4,Comment#1\n8,4,Comment#2\n9,4,Comment#3\n10,4,Comment#4\n11,5,Comment#5\n12,5,Comment#6\n13,5,Comment#7\n14,5,Comment#8\n15,5,Comment#9\n16,6,Comment#10\n17,6,Comment#11\n18,6,Comment#12\n19,6,Comment#13\n20,6,Comment#14\n21,6,Comment#15\n";
    }

    @Test
    public void testFilterOnStringTupleField() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        FilterOperator filterDs = ds.filter((FilterFunction)new Filter3());
        filterDs.writeAsCsv(this.resultPath);
        env.execute();
        this.expected = "3,2,Hello world\n4,3,Hello world, how are you?\n";
    }

    @Test
    public void testFilterOnIntegerTupleField() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        FilterOperator filterDs = ds.filter((FilterFunction)new Filter4());
        filterDs.writeAsCsv(this.resultPath);
        env.execute();
        this.expected = "2,2,Hello\n4,3,Hello world, how are you?\n6,3,Luke Skywalker\n8,4,Comment#2\n10,4,Comment#4\n12,5,Comment#6\n14,5,Comment#8\n16,6,Comment#10\n18,6,Comment#12\n20,6,Comment#14\n";
    }

    @Test
    public void testFilterBasicType() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
        FilterOperator filterDs = ds.filter((FilterFunction)new Filter5());
        filterDs.writeAsText(this.resultPath);
        env.execute();
        this.expected = "Hi\nHello\nHello world\nHello world, how are you?\n";
    }

    @Test
    public void testFilterOnCustomType() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
        FilterOperator filterDs = ds.filter((FilterFunction)new Filter6());
        filterDs.writeAsText(this.resultPath);
        env.execute();
        this.expected = "3,3,Hello world, how are you?\n3,4,I am fine.\n3,5,Luke Skywalker\n";
    }

    @Test
    public void testRichFilterOnStringTupleField() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        SingleInputUdfOperator filterDs = ds.filter((FilterFunction)new RichFilter1()).withBroadcastSet(ints, "ints");
        filterDs.writeAsCsv(this.resultPath);
        env.execute();
        this.expected = "1,1,Hi\n2,2,Hello\n3,2,Hello world\n4,3,Hello world, how are you?\n";
    }

    @Test
    public void testFilterWithBroadcastVariables() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        SingleInputUdfOperator filterDs = ds.filter((FilterFunction)new RichFilter2()).withBroadcastSet(intDs, "ints");
        filterDs.writeAsCsv(this.resultPath);
        env.execute();
        this.expected = "11,5,Comment#5\n12,5,Comment#6\n13,5,Comment#7\n14,5,Comment#8\n15,5,Comment#9\n";
    }

    public static class RichFilter2
    extends RichFilterFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;
        private int broadcastSum = 0;

        public void open(Configuration config) {
            List ints = this.getRuntimeContext().getBroadcastVariable("ints");
            for (Integer i : ints) {
                this.broadcastSum += i.intValue();
            }
        }

        public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
            return (Long)value.f1 == (long)(this.broadcastSum / 11);
        }
    }

    public static class RichFilter1
    extends RichFilterFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;
        int literal = -1;

        public void open(Configuration config) {
            List ints = this.getRuntimeContext().getBroadcastVariable("ints");
            Iterator iterator = ints.iterator();
            while (iterator.hasNext()) {
                int i = (Integer)iterator.next();
                this.literal = this.literal < i ? i : this.literal;
            }
        }

        public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
            return (Integer)value.f0 < this.literal;
        }
    }

    public static class Filter6
    implements FilterFunction<CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        public boolean filter(CollectionDataSets.CustomType value) throws Exception {
            return value.myString.contains("a");
        }
    }

    public static class Filter5
    implements FilterFunction<String> {
        private static final long serialVersionUID = 1L;

        public boolean filter(String value) throws Exception {
            return value.startsWith("H");
        }
    }

    public static class Filter4
    implements FilterFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
            return (Integer)value.f0 % 2 == 0;
        }
    }

    public static class Filter3
    implements FilterFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
            return ((String)value.f2).contains("world");
        }
    }

    public static class Filter2
    implements FilterFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
            return true;
        }
    }

    public static class Filter1
    implements FilterFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
            return false;
        }
    }
}

