package org.apache.flink.test.streaming.runtime;

import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/BroadcastStateITCase.class */
public class BroadcastStateITCase extends AbstractTestBase {

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/BroadcastStateITCase$CustomWmEmitter.class */
    private static abstract class CustomWmEmitter<T> implements AssignerWithPunctuatedWatermarks<T> {
        private static final long serialVersionUID = -5187335197674841233L;

        private CustomWmEmitter() {
        }

        @Nullable
        public Watermark checkAndGetNextWatermark(T t, long j) {
            return new Watermark(j);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/BroadcastStateITCase$TestBroadcastProcessFunction.class */
    private static class TestBroadcastProcessFunction extends BroadcastProcessFunction<Long, String, String> {
        private static final long serialVersionUID = 7616910653561100842L;
        private transient MapStateDescriptor<Long, String> descriptor;

        private TestBroadcastProcessFunction() {
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.descriptor = new MapStateDescriptor<>("broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        }

        public void processElement(Long l, BroadcastProcessFunction<Long, String, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
        }

        public void processBroadcastElement(String str, BroadcastProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {
            context.getBroadcastState(this.descriptor).put(Long.valueOf(Long.parseLong(str.split(":")[1])), str);
        }

        public /* bridge */ /* synthetic */ void processBroadcastElement(Object obj, BroadcastProcessFunction.Context context, Collector collector) throws Exception {
            processBroadcastElement((String) obj, (BroadcastProcessFunction<Long, String, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, BroadcastProcessFunction.ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
            processElement((Long) obj, (BroadcastProcessFunction<Long, String, String>.ReadOnlyContext) readOnlyContext, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/BroadcastStateITCase$TestKeyedBroadcastProcessFunction.class */
    private static class TestKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction<Long, Long, String, String> {
        private static final long serialVersionUID = 7616910653561100842L;
        private final Map<Long, String> expectedState;
        private final Map<Long, Long> timerToExpectedKey = new HashMap();
        private long nextTimerTimestamp;
        private transient MapStateDescriptor<Long, String> descriptor;

        TestKeyedBroadcastProcessFunction(long j, Map<Long, String> map) {
            this.expectedState = map;
            this.nextTimerTimestamp = j;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.descriptor = new MapStateDescriptor<>("broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        }

        public void processElement(Long l, KeyedBroadcastProcessFunction<Long, Long, String, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
            long j = this.nextTimerTimestamp;
            this.nextTimerTimestamp++;
            readOnlyContext.timerService().registerEventTimeTimer(j);
            this.timerToExpectedKey.put(Long.valueOf(j), l);
        }

        public void processBroadcastElement(String str, KeyedBroadcastProcessFunction<Long, Long, String, String>.Context context, Collector<String> collector) throws Exception {
            context.getBroadcastState(this.descriptor).put(Long.valueOf(Long.parseLong(str.split(":")[1])), str);
        }

        public void onTimer(long j, KeyedBroadcastProcessFunction<Long, Long, String, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
            Assert.assertEquals(this.timerToExpectedKey.get(Long.valueOf(j)), onTimerContext.getCurrentKey());
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : onTimerContext.getBroadcastState(this.descriptor).immutableEntries()) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
            Assert.assertEquals(this.expectedState, hashMap);
            collector.collect(Long.toString(j));
        }

        public /* bridge */ /* synthetic */ void processBroadcastElement(Object obj, KeyedBroadcastProcessFunction.Context context, Collector collector) throws Exception {
            processBroadcastElement((String) obj, (KeyedBroadcastProcessFunction<Long, Long, String, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedBroadcastProcessFunction.ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
            processElement((Long) obj, (KeyedBroadcastProcessFunction<Long, Long, String, String>.ReadOnlyContext) readOnlyContext, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/BroadcastStateITCase$TestSink.class */
    private static class TestSink extends RichSinkFunction<String> {
        private static final long serialVersionUID = 7252508825104554749L;
        private final int expectedOutputCounter;
        private int outputCounter = 0;

        TestSink(int i) {
            this.expectedOutputCounter = i;
        }

        public void invoke(String str, SinkFunction.Context context) throws Exception {
            this.outputCounter++;
        }

        public void close() throws Exception {
            super.close();
            Assert.assertEquals(this.expectedOutputCounter, this.outputCounter);
        }
    }

    @Test
    public void testKeyedWithBroadcastTranslation() throws Exception {
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        HashMap hashMap = new HashMap();
        hashMap.put(0L, "test:0");
        hashMap.put(1L, "test:1");
        hashMap.put(2L, "test:2");
        hashMap.put(3L, "test:3");
        hashMap.put(4L, "test:4");
        hashMap.put(5L, "test:5");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.generateSequence(0L, 5L).assignTimestampsAndWatermarks(new CustomWmEmitter<Long>() { // from class: org.apache.flink.test.streaming.runtime.BroadcastStateITCase.1
            private static final long serialVersionUID = -8500904795760316195L;

            public long extractTimestamp(Long l, long j) {
                return l.longValue();
            }
        }).keyBy(l -> {
            return l;
        }).connect(executionEnvironment.fromCollection(hashMap.values()).assignTimestampsAndWatermarks(new CustomWmEmitter<String>() { // from class: org.apache.flink.test.streaming.runtime.BroadcastStateITCase.2
            private static final long serialVersionUID = -2148318224248467213L;

            public long extractTimestamp(String str, long j) {
                return Long.parseLong(str.split(":")[1]);
            }
        }).broadcast(new MapStateDescriptor[]{mapStateDescriptor})).process(new TestKeyedBroadcastProcessFunction(100000L, hashMap)).addSink(new TestSink(hashMap.size())).setParallelism(1);
        executionEnvironment.execute();
    }

    @Test
    public void testBroadcastTranslation() throws Exception {
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        HashMap hashMap = new HashMap();
        hashMap.put(0L, "test:0");
        hashMap.put(1L, "test:1");
        hashMap.put(2L, "test:2");
        hashMap.put(3L, "test:3");
        hashMap.put(4L, "test:4");
        hashMap.put(5L, "test:5");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.generateSequence(0L, 5L).assignTimestampsAndWatermarks(new CustomWmEmitter<Long>() { // from class: org.apache.flink.test.streaming.runtime.BroadcastStateITCase.3
            private static final long serialVersionUID = -8500904795760316195L;

            public long extractTimestamp(Long l, long j) {
                return l.longValue();
            }
        }).connect(executionEnvironment.fromCollection(hashMap.values()).assignTimestampsAndWatermarks(new CustomWmEmitter<String>() { // from class: org.apache.flink.test.streaming.runtime.BroadcastStateITCase.4
            private static final long serialVersionUID = -2148318224248467213L;

            public long extractTimestamp(String str, long j) {
                return Long.parseLong(str.split(":")[1]);
            }
        }).broadcast(new MapStateDescriptor[]{mapStateDescriptor})).process(new TestBroadcastProcessFunction()).addSink(new TestSink(0)).setParallelism(1);
        executionEnvironment.execute();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1248296907:
                if (implMethodName.equals("lambda$testKeyedWithBroadcastTranslation$e16a244d$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/BroadcastStateITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l -> {
                        return l;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
