/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag(value="org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler")
class NonKeyedPartitionWindowedStreamITCase {
    private static final int EVENT_NUMBER = 100;
    private static final String TEST_EVENT = "Test";

    NonKeyedPartitionWindowedStreamITCase() {
    }

    @Test
    void testMapPartition() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData(this.createSource());
        int parallelism = 2;
        CloseableIterator resultIterator = source.map((MapFunction & Serializable)v -> v).setParallelism(parallelism).fullWindowPartition().mapPartition((MapPartitionFunction)new MapPartitionFunction<String, String>(){

            public void mapPartition(Iterable<String> values, Collector<String> out) {
                StringBuilder sb = new StringBuilder();
                for (String value : values) {
                    sb.append(value);
                }
                out.collect((Object)sb.toString());
            }
        }).executeAndCollect();
        String expectedResult = this.createExpectedString(100 / parallelism);
        this.expectInAnyOrder((CloseableIterator<String>)resultIterator, expectedResult, expectedResult);
    }

    @Test
    void testReduce() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new Integer[]{1, 1, 1, 1, 998, 998});
        CloseableIterator resultIterator = source.map((MapFunction & Serializable)v -> v).setParallelism(2).fullWindowPartition().reduce((ReduceFunction)new ReduceFunction<Integer>(){

            public Integer reduce(Integer value1, Integer value2) throws Exception {
                return value1 + value2;
            }
        }).map((MapFunction)new MapFunction<Integer, String>(){

            public String map(Integer value) throws Exception {
                return String.valueOf(value);
            }
        }).executeAndCollect();
        this.expectInAnyOrder((CloseableIterator<String>)resultIterator, "1000", "1000");
    }

    @Test
    void testAggregate() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new Integer[]{1, 1, 2, 2, 3, 3});
        CloseableIterator resultIterator = source.map((MapFunction & Serializable)v -> v).setParallelism(2).fullWindowPartition().aggregate((AggregateFunction)new AggregateFunction<Integer, TestAccumulator, String>(){

            public TestAccumulator createAccumulator() {
                return new TestAccumulator();
            }

            public TestAccumulator add(Integer value, TestAccumulator accumulator) {
                accumulator.addTestField(value);
                return accumulator;
            }

            public String getResult(TestAccumulator accumulator) {
                return accumulator.getTestField();
            }

            public TestAccumulator merge(TestAccumulator a, TestAccumulator b) {
                throw new RuntimeException();
            }
        }).executeAndCollect();
        this.expectInAnyOrder((CloseableIterator<String>)resultIterator, "94", "94");
    }

    @Test
    void testSortPartitionOfTupleElementsAscending() throws Exception {
        this.expectInAnyOrder(this.sortPartitionOfTupleElementsInOrder(Order.ASCENDING), "013", "013");
    }

    @Test
    void testSortPartitionOfTupleElementsDescending() throws Exception {
        this.expectInAnyOrder(this.sortPartitionOfTupleElementsInOrder(Order.DESCENDING), "310", "310");
    }

    @Test
    void testSortPartitionOfPojoElementsAscending() throws Exception {
        this.expectInAnyOrder(this.sortPartitionOfPojoElementsInOrder(Order.ASCENDING), "013", "013");
    }

    @Test
    void testSortPartitionOfPojoElementsDescending() throws Exception {
        this.expectInAnyOrder(this.sortPartitionOfPojoElementsInOrder(Order.DESCENDING), "310", "310");
    }

    @Test
    void testSortPartitionByKeySelectorAscending() throws Exception {
        this.expectInAnyOrder(this.sortPartitionByKeySelectorInOrder(Order.ASCENDING), "013", "013");
    }

    @Test
    void testSortPartitionByKeySelectorDescending() throws Exception {
        this.expectInAnyOrder(this.sortPartitionByKeySelectorInOrder(Order.DESCENDING), "310", "310");
    }

    private CloseableIterator<String> sortPartitionOfTupleElementsInOrder(Order order) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)TEST_EVENT, (Object)0), Tuple2.of((Object)TEST_EVENT, (Object)0), Tuple2.of((Object)TEST_EVENT, (Object)3), Tuple2.of((Object)TEST_EVENT, (Object)3), Tuple2.of((Object)TEST_EVENT, (Object)1), Tuple2.of((Object)TEST_EVENT, (Object)1)});
        return source.rebalance().map((MapFunction)new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>(){

            public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
                return value;
            }
        }).setParallelism(2).fullWindowPartition().sortPartition(1, order).fullWindowPartition().mapPartition((MapPartitionFunction)new MapPartitionFunction<Tuple2<String, Integer>, String>(){

            public void mapPartition(Iterable<Tuple2<String, Integer>> values, Collector<String> out) {
                StringBuilder sb = new StringBuilder();
                for (Tuple2<String, Integer> value : values) {
                    sb.append(value.f1);
                }
                out.collect((Object)sb.toString());
            }
        }).executeAndCollect();
    }

    private CloseableIterator<String> sortPartitionOfPojoElementsInOrder(Order order) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new TestPojo[]{new TestPojo(0), new TestPojo(0), new TestPojo(3), new TestPojo(3), new TestPojo(1), new TestPojo(1)});
        return source.rebalance().map((MapFunction)new MapFunction<TestPojo, TestPojo>(){

            public TestPojo map(TestPojo value) throws Exception {
                return value;
            }
        }).setParallelism(2).fullWindowPartition().sortPartition("value", order).fullWindowPartition().mapPartition((MapPartitionFunction)new MapPartitionFunction<TestPojo, String>(){

            public void mapPartition(Iterable<TestPojo> values, Collector<String> out) {
                StringBuilder sb = new StringBuilder();
                for (TestPojo value : values) {
                    sb.append(value.getValue());
                }
                out.collect((Object)sb.toString());
            }
        }).executeAndCollect();
    }

    private CloseableIterator<String> sortPartitionByKeySelectorInOrder(Order order) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new TestPojo[]{new TestPojo("KEY", 0), new TestPojo("KEY", 0), new TestPojo("KEY", 3), new TestPojo("KEY", 3), new TestPojo("KEY", 1), new TestPojo("KEY", 1)});
        return source.rebalance().map((MapFunction)new MapFunction<TestPojo, TestPojo>(){

            public TestPojo map(TestPojo value) throws Exception {
                return value;
            }
        }).setParallelism(2).fullWindowPartition().sortPartition((KeySelector)new KeySelector<TestPojo, Integer>(){

            public Integer getKey(TestPojo value) throws Exception {
                return value.getValue();
            }
        }, order).fullWindowPartition().mapPartition((MapPartitionFunction)new MapPartitionFunction<TestPojo, String>(){

            public void mapPartition(Iterable<TestPojo> values, Collector<String> out) {
                StringBuilder sb = new StringBuilder();
                for (TestPojo value : values) {
                    sb.append(value.getValue());
                }
                out.collect((Object)sb.toString());
            }
        }).executeAndCollect();
    }

    private void expectInAnyOrder(CloseableIterator<String> resultIterator, String ... expected) {
        ArrayList listExpected = Lists.newArrayList((Object[])expected);
        ArrayList testResults = Lists.newArrayList(resultIterator);
        Collections.sort(listExpected);
        Collections.sort(testResults);
        Assertions.assertThat((List)testResults).isEqualTo((Object)listExpected);
    }

    private Collection<String> createSource() {
        ArrayList<String> source = new ArrayList<String>();
        for (int index = 0; index < 100; ++index) {
            source.add(TEST_EVENT);
        }
        return source;
    }

    private String createExpectedString(int number) {
        StringBuilder stringBuilder = new StringBuilder();
        for (int index = 0; index < number; ++index) {
            stringBuilder.append(TEST_EVENT);
        }
        return stringBuilder.toString();
    }

    public static class TestPojo {
        public String key;
        public Integer value;

        public TestPojo() {
        }

        public TestPojo(Integer value) {
            this.value = value;
        }

        public TestPojo(String key, Integer value) {
            this.key = key;
            this.value = value;
        }

        public Integer getValue() {
            return this.value;
        }

        public void setValue(Integer value) {
            this.value = value;
        }

        public String getKey() {
            return this.key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }

    private static class TestAccumulator {
        private Integer testField = 100;

        private TestAccumulator() {
        }

        public void addTestField(Integer number) {
            this.testField = this.testField - number;
        }

        public String getTestField() {
            return String.valueOf(this.testField);
        }
    }
}

