/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.co;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;

public class CoBroadcastWithKeyedOperatorTest {
    private static final MapStateDescriptor<String, Integer> STATE_DESCRIPTOR = new MapStateDescriptor("broadcast-state", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);

    @Test
    public void testKeyQuerying() throws Exception {
        class KeyQueryingProcessFunction
        extends KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>, String, String> {
            KeyQueryingProcessFunction() {
            }

            public void processElement(Tuple2<Integer, String> value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
                Assert.assertTrue((String)"Did not get expected key.", (boolean)((Integer)ctx.getCurrentKey()).equals(value.f0));
                out.collect(value.f1);
            }

            public void processBroadcastElement(String value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            }
        }
        CoBroadcastWithKeyedOperator operator = new CoBroadcastWithKeyedOperator((KeyedBroadcastProcessFunction)new KeyQueryingProcessFunction(), Collections.emptyList());
        try (KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness(operator, (KeySelector & Serializable)in -> (Integer)in.f0, null, BasicTypeInfo.INT_TYPE_INFO);){
            testHarness.setup();
            testHarness.open();
            testHarness.processElement1(new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5"), 12L));
            testHarness.processElement1(new StreamRecord((Object)Tuple2.of((Object)42, (Object)"42"), 13L));
            ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
            expectedOutput.add(new StreamRecord((Object)"5", 12L));
            expectedOutput.add(new StreamRecord((Object)"42", 13L));
            TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    public void testAccessToKeyedStateIt() throws Exception {
        ArrayList<String> test1content = new ArrayList<String>();
        test1content.add("test1");
        test1content.add("test1");
        ArrayList<String> test2content = new ArrayList<String>();
        test2content.add("test2");
        test2content.add("test2");
        test2content.add("test2");
        test2content.add("test2");
        ArrayList<String> test3content = new ArrayList<String>();
        test3content.add("test3");
        test3content.add("test3");
        test3content.add("test3");
        HashMap<String, List<String>> expectedState = new HashMap<String, List<String>>();
        expectedState.put("test1", test1content);
        expectedState.put("test2", test2content);
        expectedState.put("test3", test3content);
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new StatefulFunctionWithKeyedStateAccessedOnBroadcast(expectedState));){
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test1", 12L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test1", 12L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test2", 13L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test2", 13L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test2", 13L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test3", 14L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test3", 14L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test3", 14L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"test2", 13L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)1, 13L));
        }
    }

    @Test
    public void testFunctionWithTimer() throws Exception {
        String expectedKey = "6";
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new FunctionWithTimerOnKeyed(41L, "6"));){
            testHarness.processWatermark1(new Watermark(10L));
            testHarness.processWatermark2(new Watermark(10L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)5, 12L));
            testHarness.processWatermark1(new Watermark(40L));
            testHarness.processWatermark2(new Watermark(40L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"6", 13L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"6", 15L));
            testHarness.processWatermark1(new Watermark(50L));
            testHarness.processWatermark2(new Watermark(50L));
            ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
            expectedOutput.add(new Watermark(10L));
            expectedOutput.add(new StreamRecord((Object)"BR:5 WM:10 TS:12", 12L));
            expectedOutput.add(new Watermark(40L));
            expectedOutput.add(new StreamRecord((Object)"NON-BR:6 WM:40 TS:13", 13L));
            expectedOutput.add(new StreamRecord((Object)"NON-BR:6 WM:40 TS:15", 15L));
            expectedOutput.add(new StreamRecord((Object)"TIMER:41", 41L));
            expectedOutput.add(new Watermark(50L));
            TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    public void testSideOutput() throws Exception {
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new FunctionWithSideOutput());){
            testHarness.processWatermark1(new Watermark(10L));
            testHarness.processWatermark2(new Watermark(10L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)5, 12L));
            testHarness.processWatermark1(new Watermark(40L));
            testHarness.processWatermark2(new Watermark(40L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"6", 13L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"6", 15L));
            testHarness.processWatermark1(new Watermark(50L));
            testHarness.processWatermark2(new Watermark(50L));
            ConcurrentLinkedQueue<StreamRecord> expectedBr = new ConcurrentLinkedQueue<StreamRecord>();
            expectedBr.add(new StreamRecord((Object)"BR:5 WM:10 TS:12", 12L));
            ConcurrentLinkedQueue<StreamRecord> expectedNonBr = new ConcurrentLinkedQueue<StreamRecord>();
            expectedNonBr.add(new StreamRecord((Object)"NON-BR:6 WM:40 TS:13", 13L));
            expectedNonBr.add(new StreamRecord((Object)"NON-BR:6 WM:40 TS:15", 15L));
            TestHarnessUtil.assertOutputEquals("Wrong Side Output", expectedBr, testHarness.getSideOutput(FunctionWithSideOutput.BROADCAST_TAG));
            TestHarnessUtil.assertOutputEquals("Wrong Side Output", expectedNonBr, testHarness.getSideOutput(FunctionWithSideOutput.NON_BROADCAST_TAG));
        }
    }

    @Test
    public void testFunctionWithBroadcastState() throws Exception {
        HashMap<String, Integer> expectedBroadcastState = new HashMap<String, Integer>();
        expectedBroadcastState.put("5.key", 5);
        expectedBroadcastState.put("34.key", 34);
        expectedBroadcastState.put("53.key", 53);
        expectedBroadcastState.put("12.key", 12);
        expectedBroadcastState.put("98.key", 98);
        String expectedKey = "trigger";
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new FunctionWithBroadcastState("key", expectedBroadcastState, 41L, "trigger"));){
            testHarness.processWatermark1(new Watermark(10L));
            testHarness.processWatermark2(new Watermark(10L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)5, 10L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)34, 12L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)53, 15L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)12, 16L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)98, 19L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"trigger", 13L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)51, 21L));
            testHarness.processWatermark1(new Watermark(50L));
            testHarness.processWatermark2(new Watermark(50L));
            ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
            Assert.assertEquals((long)3L, (long)output.size());
            Object firstRawWm = output.poll();
            Assert.assertTrue((boolean)(firstRawWm instanceof Watermark));
            Watermark firstWm = (Watermark)firstRawWm;
            Assert.assertEquals((long)10L, (long)firstWm.getTimestamp());
            Object rawOutputElem = output.poll();
            Assert.assertTrue((boolean)(rawOutputElem instanceof StreamRecord));
            StreamRecord outputRec = (StreamRecord)rawOutputElem;
            Assert.assertTrue((boolean)(outputRec.getValue() instanceof String));
            String outputElem = (String)outputRec.getValue();
            expectedBroadcastState.put("51.key", 51);
            ArrayList<Map.Entry<String, Integer>> expectedEntries = new ArrayList<Map.Entry<String, Integer>>();
            expectedEntries.addAll(expectedBroadcastState.entrySet());
            String expected = "TS:41 " + CoBroadcastWithKeyedOperatorTest.mapToString(expectedEntries);
            Assert.assertEquals((Object)expected, (Object)outputElem);
            Object secondRawWm = output.poll();
            Assert.assertTrue((boolean)(secondRawWm instanceof Watermark));
            Watermark secondWm = (Watermark)secondRawWm;
            Assert.assertEquals((long)50L, (long)secondWm.getTimestamp());
        }
    }

    @Test
    public void testScaleUp() throws Exception {
        OperatorSubtaskState mergedSnapshot;
        HashSet<String> keysToRegister = new HashSet<String>();
        keysToRegister.add("test1");
        keysToRegister.add("test2");
        keysToRegister.add("test3");
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 2, 0);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness2 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 2, 1);){
            testHarness1.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            testHarness2.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(testHarness1.snapshot(0L, 0L), testHarness2.snapshot(0L, 0L));
        }
        HashSet<String> expected = new HashSet<String>(3);
        expected.add("test1=3");
        expected.add("test2=3");
        expected.add("test3=3");
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 3, 0, mergedSnapshot);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness2 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 3, 1, mergedSnapshot);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness3 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 3, 2, mergedSnapshot);){
            StreamRecord rec;
            testHarness1.processElement1((StreamRecord<String>)new StreamRecord((Object)"trigger"));
            testHarness2.processElement1((StreamRecord<String>)new StreamRecord((Object)"trigger"));
            testHarness3.processElement1((StreamRecord<String>)new StreamRecord((Object)"trigger"));
            ConcurrentLinkedQueue<Object> output1 = testHarness1.getOutput();
            ConcurrentLinkedQueue<Object> output2 = testHarness2.getOutput();
            ConcurrentLinkedQueue<Object> output3 = testHarness3.getOutput();
            Assert.assertEquals((long)expected.size(), (long)output1.size());
            for (Object e : output1) {
                rec = (StreamRecord)e;
                Assert.assertTrue((boolean)expected.contains(rec.getValue()));
            }
            Assert.assertEquals((long)expected.size(), (long)output2.size());
            for (Object e : output2) {
                rec = (StreamRecord)e;
                Assert.assertTrue((boolean)expected.contains(rec.getValue()));
            }
            Assert.assertEquals((long)expected.size(), (long)output3.size());
            for (Object e : output3) {
                rec = (StreamRecord)e;
                Assert.assertTrue((boolean)expected.contains(rec.getValue()));
            }
        }
    }

    @Test
    public void testScaleDown() throws Exception {
        OperatorSubtaskState mergedSnapshot;
        HashSet<String> keysToRegister = new HashSet<String>();
        keysToRegister.add("test1");
        keysToRegister.add("test2");
        keysToRegister.add("test3");
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 3, 0);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness2 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 3, 1);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness3 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 3, 2);){
            testHarness1.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            testHarness2.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            testHarness3.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(testHarness1.snapshot(0L, 0L), testHarness2.snapshot(0L, 0L), testHarness3.snapshot(0L, 0L));
        }
        HashSet<String> expected = new HashSet<String>(3);
        expected.add("test1=3");
        expected.add("test2=3");
        expected.add("test3=3");
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 2, 0, mergedSnapshot);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness2 = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new TestFunctionWithOutput(keysToRegister), 10, 2, 1, mergedSnapshot);){
            StreamRecord rec;
            testHarness1.processElement1((StreamRecord<String>)new StreamRecord((Object)"trigger"));
            testHarness2.processElement1((StreamRecord<String>)new StreamRecord((Object)"trigger"));
            ConcurrentLinkedQueue<Object> output1 = testHarness1.getOutput();
            ConcurrentLinkedQueue<Object> output2 = testHarness2.getOutput();
            Assert.assertEquals((long)expected.size(), (long)output1.size());
            for (Object e : output1) {
                rec = (StreamRecord)e;
                Assert.assertTrue((boolean)expected.contains(rec.getValue()));
            }
            Assert.assertEquals((long)expected.size(), (long)output2.size());
            for (Object e : output2) {
                rec = (StreamRecord)e;
                Assert.assertTrue((boolean)expected.contains(rec.getValue()));
            }
        }
    }

    @Test
    public void testNoKeyedStateOnBroadcastSide() throws Exception {
        boolean exceptionThrown = false;
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(BasicTypeInfo.STRING_TYPE_INFO, new IdentityKeySelector(), new KeyedBroadcastProcessFunction<String, String, Integer, String>(){
            private static final long serialVersionUID = -1725365436500098384L;
            private final ValueStateDescriptor<String> valueState = new ValueStateDescriptor("any", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);

            public void processBroadcastElement(Integer value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
                this.getRuntimeContext().getState(this.valueState).value();
            }

            public void processElement(String value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            }
        });){
            testHarness.processWatermark1(new Watermark(10L));
            testHarness.processWatermark2(new Watermark(10L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)5, 12L));
        }
        catch (NullPointerException e) {
            Assert.assertEquals((Object)"No key set. This method should not be called outside of a keyed context.", (Object)e.getMessage());
            exceptionThrown = true;
        }
        if (!exceptionThrown) {
            Assert.fail((String)"No exception thrown");
        }
    }

    private static <KEY, IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> getInitializedTestHarness(TypeInformation<KEY> keyTypeInfo, KeySelector<IN1, KEY> keyKeySelector, KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> function) throws Exception {
        return CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(keyTypeInfo, keyKeySelector, function, 1, 1, 0);
    }

    private static <KEY, IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> getInitializedTestHarness(TypeInformation<KEY> keyTypeInfo, KeySelector<IN1, KEY> keyKeySelector, KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> function, int maxParallelism, int numTasks, int taskIdx) throws Exception {
        return CoBroadcastWithKeyedOperatorTest.getInitializedTestHarness(keyTypeInfo, keyKeySelector, function, maxParallelism, numTasks, taskIdx, null);
    }

    private static <KEY, IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> getInitializedTestHarness(TypeInformation<KEY> keyTypeInfo, KeySelector<IN1, KEY> keyKeySelector, KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> function, int maxParallelism, int numTasks, int taskIdx, OperatorSubtaskState initState) throws Exception {
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness(new CoBroadcastWithKeyedOperator((KeyedBroadcastProcessFunction)Preconditions.checkNotNull(function), Collections.singletonList(STATE_DESCRIPTOR)), keyKeySelector, null, keyTypeInfo, maxParallelism, numTasks, taskIdx);
        testHarness.setup();
        testHarness.initializeState(initState);
        testHarness.open();
        return testHarness;
    }

    private static String mapToString(List<Map.Entry<String, Integer>> entries) {
        entries.sort(Comparator.comparing(Map.Entry::getKey).thenComparingInt(Map.Entry::getValue));
        StringBuilder builder = new StringBuilder();
        for (Map.Entry<String, Integer> entry : entries) {
            builder.append(' ').append(entry.getKey()).append('=').append(entry.getValue());
        }
        return builder.toString();
    }

    private static class IdentityKeySelector<T>
    implements KeySelector<T, T> {
        private static final long serialVersionUID = 1L;

        private IdentityKeySelector() {
        }

        public T getKey(T value) throws Exception {
            return value;
        }
    }

    private static class TestFunctionWithOutput
    extends KeyedBroadcastProcessFunction<String, String, Integer, String> {
        private static final long serialVersionUID = 7496674620398203933L;
        private final Set<String> keysToRegister;

        TestFunctionWithOutput(Set<String> keysToRegister) {
            this.keysToRegister = (Set)Preconditions.checkNotNull(keysToRegister);
        }

        public void processBroadcastElement(Integer value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            for (String k : this.keysToRegister) {
                ctx.getBroadcastState(STATE_DESCRIPTOR).put((Object)k, (Object)value);
            }
        }

        public void processElement(String value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            for (Map.Entry entry : ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries()) {
                out.collect((Object)entry.toString());
            }
        }
    }

    private static class FunctionWithBroadcastState
    extends KeyedBroadcastProcessFunction<String, String, Integer, String> {
        private static final long serialVersionUID = 7496674620398203933L;
        private final String keyPostfix;
        private final Map<String, Integer> expectedBroadcastState;
        private final long timerTs;
        private final String expectedKey;

        FunctionWithBroadcastState(String keyPostfix, Map<String, Integer> expectedBroadcastState, long timerTs, String expectedKey) {
            this.keyPostfix = (String)Preconditions.checkNotNull((Object)keyPostfix);
            this.expectedBroadcastState = (Map)Preconditions.checkNotNull(expectedBroadcastState);
            this.timerTs = timerTs;
            this.expectedKey = expectedKey;
        }

        public void processBroadcastElement(Integer value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            String key = value + "." + this.keyPostfix;
            ctx.getBroadcastState(STATE_DESCRIPTOR).put((Object)key, (Object)value);
        }

        public void processElement(String value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            Iterable broadcastStateIt = ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries();
            Iterator iter = broadcastStateIt.iterator();
            for (int i = 0; i < this.expectedBroadcastState.size(); ++i) {
                Assert.assertTrue((boolean)iter.hasNext());
                Map.Entry entry = (Map.Entry)iter.next();
                Assert.assertTrue((boolean)this.expectedBroadcastState.containsKey(entry.getKey()));
                Assert.assertEquals((Object)this.expectedBroadcastState.get(entry.getKey()), entry.getValue());
            }
            Assert.assertFalse((boolean)iter.hasNext());
            ctx.timerService().registerEventTimeTimer(this.timerTs);
        }

        public void onTimer(long timestamp, KeyedBroadcastProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Iterator iter = ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries().iterator();
            ArrayList map = new ArrayList();
            while (iter.hasNext()) {
                map.add(iter.next());
            }
            Assert.assertEquals((Object)this.expectedKey, (Object)ctx.getCurrentKey());
            String mapToStr = CoBroadcastWithKeyedOperatorTest.mapToString(map);
            out.collect((Object)("TS:" + timestamp + " " + mapToStr));
        }
    }

    private static class FunctionWithSideOutput
    extends KeyedBroadcastProcessFunction<String, String, Integer, String> {
        private static final long serialVersionUID = 7496674620398203933L;
        static final OutputTag<String> BROADCAST_TAG = new OutputTag<String>("br-out"){
            private static final long serialVersionUID = -6899484480421899631L;
        };
        static final OutputTag<String> NON_BROADCAST_TAG = new OutputTag<String>("non-br-out"){
            private static final long serialVersionUID = 3837387110613831791L;
        };

        private FunctionWithSideOutput() {
        }

        public void processBroadcastElement(Integer value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            ctx.output(BROADCAST_TAG, (Object)("BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()));
        }

        public void processElement(String value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            ctx.output(NON_BROADCAST_TAG, (Object)("NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()));
        }
    }

    private static class FunctionWithTimerOnKeyed
    extends KeyedBroadcastProcessFunction<String, String, Integer, String> {
        private static final long serialVersionUID = 7496674620398203933L;
        private final long timerTS;
        private final String expectedKey;

        FunctionWithTimerOnKeyed(long timerTS, String expectedKey) {
            this.timerTS = timerTS;
            this.expectedKey = expectedKey;
        }

        public void processBroadcastElement(Integer value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)("BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()));
        }

        public void processElement(String value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            ctx.timerService().registerEventTimeTimer(this.timerTS);
            out.collect((Object)("NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()));
        }

        public void onTimer(long timestamp, KeyedBroadcastProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Assert.assertEquals((Object)this.expectedKey, (Object)ctx.getCurrentKey());
            out.collect((Object)("TIMER:" + timestamp));
        }
    }

    private static class StatefulFunctionWithKeyedStateAccessedOnBroadcast
    extends KeyedBroadcastProcessFunction<String, String, Integer, String> {
        private static final long serialVersionUID = 7496674620398203933L;
        private final ListStateDescriptor<String> listStateDesc = new ListStateDescriptor("listStateTest", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        private final Map<String, List<String>> expectedKeyedStates;

        StatefulFunctionWithKeyedStateAccessedOnBroadcast(Map<String, List<String>> expectedKeyedState) {
            this.expectedKeyedStates = (Map)Preconditions.checkNotNull(expectedKeyedState);
        }

        public void processBroadcastElement(Integer value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            ctx.applyToKeyedState(this.listStateDesc, (KeyedStateFunction)new KeyedStateFunction<String, ListState<String>>(){

                public void process(String key, ListState<String> state) throws Exception {
                    Iterator it = ((Iterable)state.get()).iterator();
                    ArrayList list = new ArrayList();
                    while (it.hasNext()) {
                        list.add(it.next());
                    }
                    Assert.assertEquals(expectedKeyedStates.get(key), list);
                }
            });
        }

        public void processElement(String value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            this.getRuntimeContext().getListState(this.listStateDesc).add((Object)value);
        }
    }
}

