/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class BroadcastStateITCase
extends AbstractTestBaseJUnit4 {
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Test
    public void testKeyedWithBroadcastTranslation() throws Exception {
        MapStateDescriptor utterDescriptor = new MapStateDescriptor("broadcast-state", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        HashMap<Long, String> expected = new HashMap<Long, String>();
        expected.put(0L, "test:0");
        expected.put(1L, "test:1");
        expected.put(2L, "test:2");
        expected.put(3L, "test:3");
        expected.put(4L, "test:4");
        expected.put(5L, "test:5");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        KeyedStream srcOne = env.fromSequence(0L, 5L).assignTimestampsAndWatermarks((WatermarkStrategy)new CustomWmEmitter<Long>(){
            private static final long serialVersionUID = -8500904795760316195L;

            public long extractTimestamp(Long element, long previousElementTimestamp) {
                return element;
            }
        }).keyBy((KeySelector & Serializable)value -> value);
        SingleOutputStreamOperator srcTwo = env.fromData(expected.values()).assignTimestampsAndWatermarks((WatermarkStrategy)new CustomWmEmitter<String>(){
            private static final long serialVersionUID = -2148318224248467213L;

            public long extractTimestamp(String element, long previousElementTimestamp) {
                return Long.parseLong(element.split(":")[1]);
            }
        });
        BroadcastStream broadcast = srcTwo.broadcast(new MapStateDescriptor[]{utterDescriptor});
        SingleOutputStreamOperator output = srcOne.connect(broadcast).process((KeyedBroadcastProcessFunction)new TestKeyedBroadcastProcessFunction(100000L, expected));
        output.addSink((SinkFunction)new TestSink(expected.size())).setParallelism(1);
        env.execute();
    }

    @Test
    public void testBroadcastTranslation() throws Exception {
        MapStateDescriptor utterDescriptor = new MapStateDescriptor("broadcast-state", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        HashMap<Long, String> expected = new HashMap<Long, String>();
        expected.put(0L, "test:0");
        expected.put(1L, "test:1");
        expected.put(2L, "test:2");
        expected.put(3L, "test:3");
        expected.put(4L, "test:4");
        expected.put(5L, "test:5");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator srcOne = env.fromSequence(0L, 5L).assignTimestampsAndWatermarks((WatermarkStrategy)new CustomWmEmitter<Long>(){
            private static final long serialVersionUID = -8500904795760316195L;

            public long extractTimestamp(Long element, long previousElementTimestamp) {
                return element;
            }
        });
        SingleOutputStreamOperator srcTwo = env.fromData(expected.values()).assignTimestampsAndWatermarks((WatermarkStrategy)new CustomWmEmitter<String>(){
            private static final long serialVersionUID = -2148318224248467213L;

            public long extractTimestamp(String element, long previousElementTimestamp) {
                return Long.parseLong(element.split(":")[1]);
            }
        });
        BroadcastStream broadcast = srcTwo.broadcast(new MapStateDescriptor[]{utterDescriptor});
        SingleOutputStreamOperator output = srcOne.connect(broadcast).process((BroadcastProcessFunction)new TestBroadcastProcessFunction());
        output.addSink((SinkFunction)new TestSink(0)).setParallelism(1);
        env.execute();
    }

    private static class TestBroadcastProcessFunction
    extends BroadcastProcessFunction<Long, String, String> {
        private static final long serialVersionUID = 7616910653561100842L;
        private transient MapStateDescriptor<Long, String> descriptor;

        private TestBroadcastProcessFunction() {
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            this.descriptor = new MapStateDescriptor("broadcast-state", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        }

        public void processElement(Long value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
        }

        public void processBroadcastElement(String value, BroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            long key = Long.parseLong(value.split(":")[1]);
            ctx.getBroadcastState(this.descriptor).put((Object)key, (Object)value);
        }
    }

    private static class TestKeyedBroadcastProcessFunction
    extends KeyedBroadcastProcessFunction<Long, Long, String, String> {
        private static final long serialVersionUID = 7616910653561100842L;
        private final Map<Long, String> expectedState;
        private final Map<Long, Long> timerToExpectedKey = new HashMap<Long, Long>();
        private long nextTimerTimestamp;
        private transient MapStateDescriptor<Long, String> descriptor;

        TestKeyedBroadcastProcessFunction(long initialTimerTimestamp, Map<Long, String> expectedBroadcastState) {
            this.expectedState = expectedBroadcastState;
            this.nextTimerTimestamp = initialTimerTimestamp;
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            this.descriptor = new MapStateDescriptor("broadcast-state", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        }

        public void processElement(Long value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            long currentTime = this.nextTimerTimestamp++;
            ctx.timerService().registerEventTimeTimer(currentTime);
            this.timerToExpectedKey.put(currentTime, value);
        }

        public void processBroadcastElement(String value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            long key = Long.parseLong(value.split(":")[1]);
            ctx.getBroadcastState(this.descriptor).put((Object)key, (Object)value);
        }

        public void onTimer(long timestamp, KeyedBroadcastProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Assert.assertEquals((Object)this.timerToExpectedKey.get(timestamp), (Object)ctx.getCurrentKey());
            HashMap<Long, String> map = new HashMap<Long, String>();
            for (Map.Entry entry : ctx.getBroadcastState(this.descriptor).immutableEntries()) {
                map.put((Long)entry.getKey(), (String)entry.getValue());
            }
            Assert.assertEquals(this.expectedState, map);
            out.collect((Object)Long.toString(timestamp));
        }
    }

    private static abstract class CustomWmEmitter<T>
    implements WatermarkStrategyWithPunctuatedWatermarks<T> {
        private static final long serialVersionUID = -5187335197674841233L;

        private CustomWmEmitter() {
        }

        @Nullable
        public Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) {
            return new Watermark(extractedTimestamp);
        }
    }

    private static class TestSink
    extends RichSinkFunction<String> {
        private static final long serialVersionUID = 7252508825104554749L;
        private final int expectedOutputCounter;
        private int outputCounter;

        TestSink(int expectedOutputCounter) {
            this.expectedOutputCounter = expectedOutputCounter;
            this.outputCounter = 0;
        }

        public void invoke(String value, SinkFunction.Context context) throws Exception {
            ++this.outputCounter;
        }

        public void close() throws Exception {
            super.close();
            Assert.assertEquals((long)this.expectedOutputCounter, (long)this.outputCounter);
        }
    }
}

