/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag(value="org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler")
class KeyedPartitionWindowedStreamITCase {
    private static final int EVENT_NUMBER = 100;
    private static final String TEST_EVENT = "Test";

    KeyedPartitionWindowedStreamITCase() {
    }

    @Test
    void testMapPartition() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData(this.createSource());
        CloseableIterator resultIterator = source.map((MapFunction)new MapFunction<Tuple2<String, String>, Tuple2<String, String>>(){

            public Tuple2<String, String> map(Tuple2<String, String> value) throws Exception {
                return value;
            }
        }).setParallelism(2).keyBy((KeySelector)new KeySelector<Tuple2<String, String>, String>(){

            public String getKey(Tuple2<String, String> value) throws Exception {
                return (String)value.f0;
            }
        }).fullWindowPartition().mapPartition((MapPartitionFunction)new MapPartitionFunction<Tuple2<String, String>, String>(){

            public void mapPartition(Iterable<Tuple2<String, String>> values, Collector<String> out) throws Exception {
                StringBuilder sb = new StringBuilder();
                for (Tuple2<String, String> value : values) {
                    sb.append((String)value.f0);
                    sb.append((String)value.f1);
                }
                out.collect((Object)sb.toString());
            }
        }).executeAndCollect();
        this.expectInAnyOrder((CloseableIterator<String>)resultIterator, this.createExpectedString(1), this.createExpectedString(2), this.createExpectedString(3));
    }

    @Test
    void testReduce() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"key1", (Object)1), Tuple2.of((Object)"key1", (Object)999), Tuple2.of((Object)"key2", (Object)2), Tuple2.of((Object)"key2", (Object)998), Tuple2.of((Object)"key3", (Object)3), Tuple2.of((Object)"key3", (Object)997)});
        CloseableIterator resultIterator = source.map((MapFunction)new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>(){

            public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
                return value;
            }
        }).setParallelism(2).keyBy((KeySelector)new KeySelector<Tuple2<String, Integer>, String>(){

            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return (String)value.f0;
            }
        }).fullWindowPartition().reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of((Object)((String)value1.f0), (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        }).map((MapFunction)new MapFunction<Tuple2<String, Integer>, String>(){

            public String map(Tuple2<String, Integer> value) throws Exception {
                return (String)value.f0 + value.f1;
            }
        }).executeAndCollect();
        this.expectInAnyOrder((CloseableIterator<String>)resultIterator, "key11000", "key21000", "key31000");
    }

    @Test
    void testAggregate() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"Key1", (Object)1), Tuple2.of((Object)"Key1", (Object)2), Tuple2.of((Object)"Key2", (Object)2), Tuple2.of((Object)"Key2", (Object)1), Tuple2.of((Object)"Key3", (Object)1), Tuple2.of((Object)"Key3", (Object)1), Tuple2.of((Object)"Key3", (Object)1)});
        CloseableIterator resultIterator = source.map((MapFunction)new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>(){

            public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
                return value;
            }
        }).setParallelism(2).keyBy((KeySelector)new KeySelector<Tuple2<String, Integer>, String>(){

            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return (String)value.f0;
            }
        }).fullWindowPartition().aggregate((AggregateFunction)new AggregateFunction<Tuple2<String, Integer>, TestAccumulator, String>(){

            public TestAccumulator createAccumulator() {
                return new TestAccumulator();
            }

            public TestAccumulator add(Tuple2<String, Integer> value, TestAccumulator accumulator) {
                accumulator.addTestField((Integer)value.f1);
                return accumulator;
            }

            public String getResult(TestAccumulator accumulator) {
                return accumulator.getTestField();
            }

            public TestAccumulator merge(TestAccumulator a, TestAccumulator b) {
                throw new RuntimeException();
            }
        }).executeAndCollect();
        this.expectInAnyOrder((CloseableIterator<String>)resultIterator, "97", "97", "97");
    }

    @Test
    void testSortPartitionOfTupleElementsAscending() throws Exception {
        this.expectInAnyOrder(this.sortPartitionOfTupleElementsInOrder(Order.ASCENDING), "0 1 3 7 ", "0 1 79 100 ", "8 55 66 77 ");
    }

    @Test
    void testSortPartitionOfTupleElementsDescending() throws Exception {
        this.expectInAnyOrder(this.sortPartitionOfTupleElementsInOrder(Order.DESCENDING), "7 3 1 0 ", "100 79 1 0 ", "77 66 55 8 ");
    }

    @Test
    void testSortPartitionOfPojoElementsAscending() throws Exception {
        this.expectInAnyOrder(this.sortPartitionOfPojoElementsInOrder(Order.ASCENDING), "0 1 3 7 ", "0 1 79 100 ", "8 55 66 77 ");
    }

    @Test
    void testSortPartitionOfPojoElementsDescending() throws Exception {
        this.expectInAnyOrder(this.sortPartitionOfPojoElementsInOrder(Order.DESCENDING), "7 3 1 0 ", "100 79 1 0 ", "77 66 55 8 ");
    }

    @Test
    public void testSortPartitionByKeySelectorAscending() throws Exception {
        this.expectInAnyOrder(this.sortPartitionByKeySelectorInOrder(Order.ASCENDING), "0 1 3 7 ", "0 1 79 100 ", "8 55 66 77 ");
    }

    @Test
    void testSortPartitionByKeySelectorDescending() throws Exception {
        this.expectInAnyOrder(this.sortPartitionByKeySelectorInOrder(Order.DESCENDING), "7 3 1 0 ", "100 79 1 0 ", "77 66 55 8 ");
    }

    private CloseableIterator<String> sortPartitionOfTupleElementsInOrder(Order order) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"key1", (Object)0), Tuple2.of((Object)"key1", (Object)7), Tuple2.of((Object)"key1", (Object)3), Tuple2.of((Object)"key1", (Object)1), Tuple2.of((Object)"key2", (Object)1), Tuple2.of((Object)"key2", (Object)100), Tuple2.of((Object)"key2", (Object)0), Tuple2.of((Object)"key2", (Object)79), Tuple2.of((Object)"key3", (Object)77), Tuple2.of((Object)"key3", (Object)66), Tuple2.of((Object)"key3", (Object)55), Tuple2.of((Object)"key3", (Object)8)});
        return source.map((MapFunction)new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>(){

            public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
                return value;
            }
        }).setParallelism(2).keyBy((KeySelector)new KeySelector<Tuple2<String, Integer>, String>(){

            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return (String)value.f0;
            }
        }).fullWindowPartition().sortPartition(1, order).fullWindowPartition().mapPartition((MapPartitionFunction)new MapPartitionFunction<Tuple2<String, Integer>, String>(){

            public void mapPartition(Iterable<Tuple2<String, Integer>> values, Collector<String> out) {
                StringBuilder sb = new StringBuilder();
                String preKey = null;
                for (Tuple2<String, Integer> value : values) {
                    if (preKey != null && !preKey.equals(value.f0)) {
                        out.collect((Object)sb.toString());
                        sb = new StringBuilder();
                    }
                    sb.append(value.f1);
                    sb.append(" ");
                    preKey = (String)value.f0;
                }
                out.collect((Object)sb.toString());
            }
        }).executeAndCollect();
    }

    private CloseableIterator<String> sortPartitionOfPojoElementsInOrder(Order order) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new TestPojo[]{new TestPojo("key1", 0), new TestPojo("key1", 7), new TestPojo("key1", 3), new TestPojo("key1", 1), new TestPojo("key2", 1), new TestPojo("key2", 100), new TestPojo("key2", 0), new TestPojo("key2", 79), new TestPojo("key3", 77), new TestPojo("key3", 66), new TestPojo("key3", 55), new TestPojo("key3", 8)});
        return source.map((MapFunction)new MapFunction<TestPojo, TestPojo>(){

            public TestPojo map(TestPojo value) throws Exception {
                return value;
            }
        }).setParallelism(2).keyBy((KeySelector)new KeySelector<TestPojo, String>(){

            public String getKey(TestPojo value) throws Exception {
                return value.getKey();
            }
        }).fullWindowPartition().sortPartition("value", order).fullWindowPartition().mapPartition((MapPartitionFunction)new MapPartitionFunction<TestPojo, String>(){

            public void mapPartition(Iterable<TestPojo> values, Collector<String> out) {
                StringBuilder sb = new StringBuilder();
                String preKey = null;
                for (TestPojo value : values) {
                    if (preKey != null && !preKey.equals(value.getKey())) {
                        out.collect((Object)sb.toString());
                        sb = new StringBuilder();
                    }
                    sb.append(value.getValue());
                    sb.append(" ");
                    preKey = value.getKey();
                }
                out.collect((Object)sb.toString());
            }
        }).executeAndCollect();
    }

    private CloseableIterator<String> sortPartitionByKeySelectorInOrder(Order order) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new TestPojo[]{new TestPojo("key1", 0), new TestPojo("key1", 7), new TestPojo("key1", 3), new TestPojo("key1", 1), new TestPojo("key2", 1), new TestPojo("key2", 100), new TestPojo("key2", 0), new TestPojo("key2", 79), new TestPojo("key3", 77), new TestPojo("key3", 66), new TestPojo("key3", 55), new TestPojo("key3", 8)});
        return source.map((MapFunction)new MapFunction<TestPojo, TestPojo>(){

            public TestPojo map(TestPojo value) throws Exception {
                return value;
            }
        }).setParallelism(2).keyBy((KeySelector)new KeySelector<TestPojo, String>(){

            public String getKey(TestPojo value) throws Exception {
                return value.getKey();
            }
        }).fullWindowPartition().sortPartition((KeySelector)new KeySelector<TestPojo, Integer>(){

            public Integer getKey(TestPojo value) throws Exception {
                return value.getValue();
            }
        }, order).fullWindowPartition().mapPartition((MapPartitionFunction)new MapPartitionFunction<TestPojo, String>(){

            public void mapPartition(Iterable<TestPojo> values, Collector<String> out) {
                StringBuilder sb = new StringBuilder();
                String preKey = null;
                for (TestPojo value : values) {
                    if (preKey != null && !preKey.equals(value.getKey())) {
                        out.collect((Object)sb.toString());
                        sb = new StringBuilder();
                    }
                    sb.append(value.getValue());
                    sb.append(" ");
                    preKey = value.getKey();
                }
                out.collect((Object)sb.toString());
            }
        }).executeAndCollect();
    }

    private Collection<Tuple2<String, String>> createSource() {
        ArrayList<Tuple2<String, String>> source = new ArrayList<Tuple2<String, String>>();
        for (int index = 0; index < 100; ++index) {
            source.add((Tuple2<String, String>)Tuple2.of((Object)"k1", (Object)TEST_EVENT));
            source.add((Tuple2<String, String>)Tuple2.of((Object)"k2", (Object)TEST_EVENT));
            source.add((Tuple2<String, String>)Tuple2.of((Object)"k3", (Object)TEST_EVENT));
        }
        return source;
    }

    private String createExpectedString(int key) {
        StringBuilder stringBuilder = new StringBuilder();
        for (int index = 0; index < 100; ++index) {
            stringBuilder.append("k").append(key).append(TEST_EVENT);
        }
        return stringBuilder.toString();
    }

    private void expectInAnyOrder(CloseableIterator<String> resultIterator, String ... expected) {
        ArrayList listExpected = Lists.newArrayList((Object[])expected);
        ArrayList testResults = Lists.newArrayList(resultIterator);
        Collections.sort(listExpected);
        Collections.sort(testResults);
        Assertions.assertThat((List)testResults).isEqualTo((Object)listExpected);
    }

    public static class TestPojo {
        public String key;
        public Integer value;

        public TestPojo() {
        }

        public TestPojo(Integer value) {
            this.value = value;
        }

        public TestPojo(String key, Integer value) {
            this.key = key;
            this.value = value;
        }

        public Integer getValue() {
            return this.value;
        }

        public void setValue(Integer value) {
            this.value = value;
        }

        public String getKey() {
            return this.key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }

    private static class TestAccumulator {
        private Integer testField = 100;

        private TestAccumulator() {
        }

        public void addTestField(Integer number) {
            this.testField = this.testField - number;
        }

        public String getTestField() {
            return String.valueOf(this.testField);
        }
    }
}

