package org.apache.flink.test.streaming.runtime;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler")
/* loaded from: input_file:org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase.class */
class KeyedPartitionWindowedStreamITCase {
    private static final int EVENT_NUMBER = 100;
    private static final String TEST_EVENT = "Test";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase$TestAccumulator.class */
    public static class TestAccumulator {
        private Integer testField;

        private TestAccumulator() {
            this.testField = 100;
        }

        public void addTestField(Integer num) {
            this.testField = Integer.valueOf(this.testField.intValue() - num.intValue());
        }

        public String getTestField() {
            return String.valueOf(this.testField);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/KeyedPartitionWindowedStreamITCase$TestPojo.class */
    public static class TestPojo {
        public String key;
        public Integer value;

        public TestPojo() {
        }

        public TestPojo(Integer num) {
            this.value = num;
        }

        public TestPojo(String str, Integer num) {
            this.key = str;
            this.value = num;
        }

        public Integer getValue() {
            return this.value;
        }

        public void setValue(Integer num) {
            this.value = num;
        }

        public String getKey() {
            return this.key;
        }

        public void setKey(String str) {
            this.key = str;
        }
    }

    KeyedPartitionWindowedStreamITCase() {
    }

    @Test
    void testMapPartition() throws Exception {
        expectInAnyOrder(StreamExecutionEnvironment.getExecutionEnvironment().fromData(createSource()).map(new MapFunction<Tuple2<String, String>, Tuple2<String, String>>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.3
            public Tuple2<String, String> map(Tuple2<String, String> tuple2) throws Exception {
                return tuple2;
            }
        }).setParallelism(2).keyBy(new KeySelector<Tuple2<String, String>, String>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.2
            public String getKey(Tuple2<String, String> tuple2) throws Exception {
                return (String) tuple2.f0;
            }
        }).fullWindowPartition().mapPartition(new MapPartitionFunction<Tuple2<String, String>, String>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.1
            public void mapPartition(Iterable<Tuple2<String, String>> iterable, Collector<String> collector) throws Exception {
                StringBuilder sb = new StringBuilder();
                for (Tuple2<String, String> tuple2 : iterable) {
                    sb.append((String) tuple2.f0);
                    sb.append((String) tuple2.f1);
                }
                collector.collect(sb.toString());
            }
        }).executeAndCollect(), createExpectedString(1), createExpectedString(2), createExpectedString(3));
    }

    @Test
    void testReduce() throws Exception {
        expectInAnyOrder(StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("key1", 1), Tuple2.of("key1", 999), Tuple2.of("key2", 2), Tuple2.of("key2", 998), Tuple2.of("key3", 3), Tuple2.of("key3", 997)}).map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.7
            public Tuple2<String, Integer> map(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2;
            }
        }).setParallelism(2).keyBy(new KeySelector<Tuple2<String, Integer>, String>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.6
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return (String) tuple2.f0;
            }
        }).fullWindowPartition().reduce(new ReduceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.5
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
                return Tuple2.of(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
            }
        }).map(new MapFunction<Tuple2<String, Integer>, String>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.4
            public String map(Tuple2<String, Integer> tuple2) throws Exception {
                return ((String) tuple2.f0) + tuple2.f1;
            }
        }).executeAndCollect(), "key11000", "key21000", "key31000");
    }

    @Test
    void testAggregate() throws Exception {
        expectInAnyOrder(StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("Key1", 1), Tuple2.of("Key1", 2), Tuple2.of("Key2", 2), Tuple2.of("Key2", 1), Tuple2.of("Key3", 1), Tuple2.of("Key3", 1), Tuple2.of("Key3", 1)}).map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.10
            public Tuple2<String, Integer> map(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2;
            }
        }).setParallelism(2).keyBy(new KeySelector<Tuple2<String, Integer>, String>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.9
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return (String) tuple2.f0;
            }
        }).fullWindowPartition().aggregate(new AggregateFunction<Tuple2<String, Integer>, TestAccumulator, String>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.8
            /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
            public TestAccumulator m981createAccumulator() {
                return new TestAccumulator();
            }

            public TestAccumulator add(Tuple2<String, Integer> tuple2, TestAccumulator testAccumulator) {
                testAccumulator.addTestField((Integer) tuple2.f1);
                return testAccumulator;
            }

            public String getResult(TestAccumulator testAccumulator) {
                return testAccumulator.getTestField();
            }

            public TestAccumulator merge(TestAccumulator testAccumulator, TestAccumulator testAccumulator2) {
                throw new RuntimeException();
            }
        }).executeAndCollect(), "97", "97", "97");
    }

    @Test
    void testSortPartitionOfTupleElementsAscending() throws Exception {
        expectInAnyOrder(sortPartitionOfTupleElementsInOrder(Order.ASCENDING), "0 1 3 7 ", "0 1 79 100 ", "8 55 66 77 ");
    }

    @Test
    void testSortPartitionOfTupleElementsDescending() throws Exception {
        expectInAnyOrder(sortPartitionOfTupleElementsInOrder(Order.DESCENDING), "7 3 1 0 ", "100 79 1 0 ", "77 66 55 8 ");
    }

    @Test
    void testSortPartitionOfPojoElementsAscending() throws Exception {
        expectInAnyOrder(sortPartitionOfPojoElementsInOrder(Order.ASCENDING), "0 1 3 7 ", "0 1 79 100 ", "8 55 66 77 ");
    }

    @Test
    void testSortPartitionOfPojoElementsDescending() throws Exception {
        expectInAnyOrder(sortPartitionOfPojoElementsInOrder(Order.DESCENDING), "7 3 1 0 ", "100 79 1 0 ", "77 66 55 8 ");
    }

    @Test
    public void testSortPartitionByKeySelectorAscending() throws Exception {
        expectInAnyOrder(sortPartitionByKeySelectorInOrder(Order.ASCENDING), "0 1 3 7 ", "0 1 79 100 ", "8 55 66 77 ");
    }

    @Test
    void testSortPartitionByKeySelectorDescending() throws Exception {
        expectInAnyOrder(sortPartitionByKeySelectorInOrder(Order.DESCENDING), "7 3 1 0 ", "100 79 1 0 ", "77 66 55 8 ");
    }

    private CloseableIterator<String> sortPartitionOfTupleElementsInOrder(Order order) throws Exception {
        return StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("key1", 0), Tuple2.of("key1", 7), Tuple2.of("key1", 3), Tuple2.of("key1", 1), Tuple2.of("key2", 1), Tuple2.of("key2", 100), Tuple2.of("key2", 0), Tuple2.of("key2", 79), Tuple2.of("key3", 77), Tuple2.of("key3", 66), Tuple2.of("key3", 55), Tuple2.of("key3", 8)}).map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.13
            public Tuple2<String, Integer> map(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2;
            }
        }).setParallelism(2).keyBy(new KeySelector<Tuple2<String, Integer>, String>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.12
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return (String) tuple2.f0;
            }
        }).fullWindowPartition().sortPartition(1, order).fullWindowPartition().mapPartition(new MapPartitionFunction<Tuple2<String, Integer>, String>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.11
            public void mapPartition(Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector) {
                StringBuilder sb = new StringBuilder();
                String str = null;
                for (Tuple2<String, Integer> tuple2 : iterable) {
                    if (str != null && !str.equals(tuple2.f0)) {
                        collector.collect(sb.toString());
                        sb = new StringBuilder();
                    }
                    sb.append(tuple2.f1);
                    sb.append(" ");
                    str = (String) tuple2.f0;
                }
                collector.collect(sb.toString());
            }
        }).executeAndCollect();
    }

    private CloseableIterator<String> sortPartitionOfPojoElementsInOrder(Order order) throws Exception {
        return StreamExecutionEnvironment.getExecutionEnvironment().fromData(new TestPojo[]{new TestPojo("key1", 0), new TestPojo("key1", 7), new TestPojo("key1", 3), new TestPojo("key1", 1), new TestPojo("key2", 1), new TestPojo("key2", 100), new TestPojo("key2", 0), new TestPojo("key2", 79), new TestPojo("key3", 77), new TestPojo("key3", 66), new TestPojo("key3", 55), new TestPojo("key3", 8)}).map(new MapFunction<TestPojo, TestPojo>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.16
            public TestPojo map(TestPojo testPojo) throws Exception {
                return testPojo;
            }
        }).setParallelism(2).keyBy(new KeySelector<TestPojo, String>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.15
            public String getKey(TestPojo testPojo) throws Exception {
                return testPojo.getKey();
            }
        }).fullWindowPartition().sortPartition("value", order).fullWindowPartition().mapPartition(new MapPartitionFunction<TestPojo, String>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.14
            public void mapPartition(Iterable<TestPojo> iterable, Collector<String> collector) {
                StringBuilder sb = new StringBuilder();
                String str = null;
                for (TestPojo testPojo : iterable) {
                    if (str != null && !str.equals(testPojo.getKey())) {
                        collector.collect(sb.toString());
                        sb = new StringBuilder();
                    }
                    sb.append(testPojo.getValue());
                    sb.append(" ");
                    str = testPojo.getKey();
                }
                collector.collect(sb.toString());
            }
        }).executeAndCollect();
    }

    private CloseableIterator<String> sortPartitionByKeySelectorInOrder(Order order) throws Exception {
        return StreamExecutionEnvironment.getExecutionEnvironment().fromData(new TestPojo[]{new TestPojo("key1", 0), new TestPojo("key1", 7), new TestPojo("key1", 3), new TestPojo("key1", 1), new TestPojo("key2", 1), new TestPojo("key2", 100), new TestPojo("key2", 0), new TestPojo("key2", 79), new TestPojo("key3", 77), new TestPojo("key3", 66), new TestPojo("key3", 55), new TestPojo("key3", 8)}).map(new MapFunction<TestPojo, TestPojo>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.20
            public TestPojo map(TestPojo testPojo) throws Exception {
                return testPojo;
            }
        }).setParallelism(2).keyBy(new KeySelector<TestPojo, String>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.19
            public String getKey(TestPojo testPojo) throws Exception {
                return testPojo.getKey();
            }
        }).fullWindowPartition().sortPartition(new KeySelector<TestPojo, Integer>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.18
            public Integer getKey(TestPojo testPojo) throws Exception {
                return testPojo.getValue();
            }
        }, order).fullWindowPartition().mapPartition(new MapPartitionFunction<TestPojo, String>() { // from class: org.apache.flink.test.streaming.runtime.KeyedPartitionWindowedStreamITCase.17
            public void mapPartition(Iterable<TestPojo> iterable, Collector<String> collector) {
                StringBuilder sb = new StringBuilder();
                String str = null;
                for (TestPojo testPojo : iterable) {
                    if (str != null && !str.equals(testPojo.getKey())) {
                        collector.collect(sb.toString());
                        sb = new StringBuilder();
                    }
                    sb.append(testPojo.getValue());
                    sb.append(" ");
                    str = testPojo.getKey();
                }
                collector.collect(sb.toString());
            }
        }).executeAndCollect();
    }

    private Collection<Tuple2<String, String>> createSource() {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(Tuple2.of("k1", TEST_EVENT));
            arrayList.add(Tuple2.of("k2", TEST_EVENT));
            arrayList.add(Tuple2.of("k3", TEST_EVENT));
        }
        return arrayList;
    }

    private String createExpectedString(int i) {
        StringBuilder sb = new StringBuilder();
        for (int i2 = 0; i2 < 100; i2++) {
            sb.append("k").append(i).append(TEST_EVENT);
        }
        return sb.toString();
    }

    private void expectInAnyOrder(CloseableIterator<String> closeableIterator, String... strArr) {
        ArrayList newArrayList = Lists.newArrayList(strArr);
        ArrayList newArrayList2 = Lists.newArrayList(closeableIterator);
        Collections.sort(newArrayList);
        Collections.sort(newArrayList2);
        Assertions.assertThat(newArrayList2).isEqualTo(newArrayList);
    }
}
