/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api.input;

import java.util.Collections;
import java.util.HashMap;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.state.api.input.BroadcastStateInputFormat;
import org.apache.flink.state.api.input.splits.OperatorStateInputSplit;
import org.apache.flink.state.api.runtime.OperatorIDGenerator;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class BroadcastStateInputFormatTest {
    private static MapStateDescriptor<Integer, Integer> descriptor = new MapStateDescriptor("state", Types.INT, Types.INT);

    @Test
    public void testReadBroadcastState() throws Exception {
        try (TwoInputStreamOperatorTestHarness<Void, Integer, Void> testHarness = this.getTestHarness();){
            testHarness.open();
            testHarness.processElement2(new StreamRecord((Object)1));
            testHarness.processElement2(new StreamRecord((Object)2));
            testHarness.processElement2(new StreamRecord((Object)3));
            OperatorSubtaskState subtaskState = testHarness.snapshot(0L, 0L);
            OperatorState state = new OperatorState(OperatorIDGenerator.fromUid((String)"uid"), 1, 4);
            state.putState(0, subtaskState);
            OperatorStateInputSplit split = new OperatorStateInputSplit(subtaskState.getManagedOperatorState(), 0);
            BroadcastStateInputFormat format = new BroadcastStateInputFormat(state, new Configuration(), null, descriptor);
            format.setRuntimeContext((RuntimeContext)new MockStreamingRuntimeContext(false, 1, 0));
            format.open(split);
            HashMap<Object, Object> results = new HashMap<Object, Object>(3);
            while (!format.reachedEnd()) {
                Tuple2 entry = (Tuple2)format.nextRecord(null);
                results.put(entry.f0, entry.f1);
            }
            HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(3);
            expected.put(1, 1);
            expected.put(2, 2);
            expected.put(3, 3);
            Assert.assertEquals((String)"Failed to read correct list state from state backend", expected, results);
        }
    }

    private TwoInputStreamOperatorTestHarness<Void, Integer, Void> getTestHarness() throws Exception {
        return new TwoInputStreamOperatorTestHarness((TwoInputStreamOperator)new CoBroadcastWithNonKeyedOperator((BroadcastProcessFunction)new StatefulFunction(), Collections.singletonList(descriptor)));
    }

    static class StatefulFunction
    extends BroadcastProcessFunction<Void, Integer, Void> {
        StatefulFunction() {
        }

        public void processElement(Void value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<Void> out) {
        }

        public void processBroadcastElement(Integer value, BroadcastProcessFunction.Context ctx, Collector<Void> out) throws Exception {
            ctx.getBroadcastState(descriptor).put((Object)value, (Object)value);
        }
    }
}

