package org.apache.flink.test.streaming.runtime;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.class */
public class LatencyMarkerITCase {
    @Test
    public void testBroadcast() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(4);
        executionEnvironment.getConfig().setLatencyTrackingInterval(2000L);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.fromData(new String[]{"test"}).connect(executionEnvironment.fromData((List) IntStream.range(0, 100000).boxed().collect(Collectors.toList())).setParallelism(1).broadcast(new MapStateDescriptor[]{new MapStateDescriptor("BroadcastState", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)})).process(new BroadcastProcessFunction<String, Integer, Integer>() { // from class: org.apache.flink.test.streaming.runtime.LatencyMarkerITCase.1
            int expected = 0;

            public void processElement(String str, BroadcastProcessFunction<String, Integer, Integer>.ReadOnlyContext readOnlyContext, Collector<Integer> collector) {
            }

            public void processBroadcastElement(Integer num, BroadcastProcessFunction<String, Integer, Integer>.Context context, Collector<Integer> collector) {
                int intValue = num.intValue();
                int i = this.expected;
                this.expected = i + 1;
                if (intValue != i) {
                    throw new AssertionError(String.format("Value was supposed to be: '%s', but was: '%s'", Integer.valueOf(this.expected - 1), num));
                }
                collector.collect(num);
            }

            public /* bridge */ /* synthetic */ void processBroadcastElement(Object obj, BroadcastProcessFunction.Context context, Collector collector) throws Exception {
                processBroadcastElement((Integer) obj, (BroadcastProcessFunction<String, Integer, Integer>.Context) context, (Collector<Integer>) collector);
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, BroadcastProcessFunction.ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
                processElement((String) obj, (BroadcastProcessFunction<String, Integer, Integer>.ReadOnlyContext) readOnlyContext, (Collector<Integer>) collector);
            }
        }).addSink(new MigrationTestUtils.AccumulatorCountingSink()).setParallelism(1);
        Assert.assertEquals(100000 * 4, ((Integer) executionEnvironment.execute().getAccumulatorResult(MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR)).intValue());
    }
}
