package org.apache.flink.test.streaming.runtime;

import java.io.IOException;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/GlobalAggregateITCase.class */
public class GlobalAggregateITCase extends AbstractTestBase {

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/GlobalAggregateITCase$ExceptionThrowingAggregateFunction.class */
    private static class ExceptionThrowingAggregateFunction implements AggregateFunction<Integer, Integer, Integer> {
        private ExceptionThrowingAggregateFunction() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Integer m919createAccumulator() {
            return 0;
        }

        public Integer add(Integer num, Integer num2) {
            throw new NullPointerException("test");
        }

        public Integer getResult(Integer num) {
            return num;
        }

        public Integer merge(Integer num, Integer num2) {
            return add(num, num2);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/GlobalAggregateITCase$IntegerAggregateFunction.class */
    private static class IntegerAggregateFunction implements AggregateFunction<Integer, Integer, Integer> {
        private IntegerAggregateFunction() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Integer m920createAccumulator() {
            return 0;
        }

        public Integer add(Integer num, Integer num2) {
            return Integer.valueOf(num.intValue() + num2.intValue());
        }

        public Integer getResult(Integer num) {
            return num;
        }

        public Integer merge(Integer num, Integer num2) {
            return add(num, num2);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/GlobalAggregateITCase$TestSourceFunction.class */
    private static class TestSourceFunction extends RichSourceFunction<Integer> {
        private GlobalAggregateManager aggregateManager = null;
        private final AggregateFunction<Integer, Integer, Integer> aggregateFunction;
        private final boolean expectFailures;

        public TestSourceFunction(AggregateFunction<Integer, Integer, Integer> aggregateFunction, boolean z) {
            this.aggregateFunction = aggregateFunction;
            this.expectFailures = z;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            Integer num = 0;
            int i = 0;
            for (int i2 = 0; i2 < 5; i2++) {
                Integer num2 = 0;
                try {
                    num2 = (Integer) this.aggregateManager.updateGlobalAggregate("testAgg", Integer.valueOf(i2), this.aggregateFunction);
                    num = Integer.valueOf(num.intValue() + i2);
                } catch (IOException e) {
                    i++;
                }
                if (this.expectFailures) {
                    Assert.assertEquals(i2 + 1, i);
                } else {
                    Assert.assertEquals(num, num2);
                }
            }
        }

        public void cancel() {
        }
    }

    @Test
    public void testSuccessfulUpdateToGlobalAggregate() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.addSource(new TestSourceFunction(new IntegerAggregateFunction(), false)).addSink(new DiscardingSink());
        executionEnvironment.execute();
    }

    @Test
    public void testExceptionThrowingAggregateFunction() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.addSource(new TestSourceFunction(new ExceptionThrowingAggregateFunction(), true)).addSink(new DiscardingSink());
        executionEnvironment.execute();
    }
}
