package org.apache.flink.state.api.input;

import java.util.Collections;
import java.util.HashMap;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.input.splits.OperatorStateInputSplit;
import org.apache.flink.state.api.runtime.OperatorIDGenerator;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/state/api/input/BroadcastStateInputFormatTest.class */
public class BroadcastStateInputFormatTest {
    private static MapStateDescriptor<Integer, Integer> descriptor = new MapStateDescriptor<>("state", Types.INT, Types.INT);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/state/api/input/BroadcastStateInputFormatTest$StatefulFunction.class */
    public static class StatefulFunction extends BroadcastProcessFunction<Void, Integer, Void> {
        StatefulFunction() {
        }

        public void processElement(Void r2, BroadcastProcessFunction<Void, Integer, Void>.ReadOnlyContext readOnlyContext, Collector<Void> collector) {
        }

        public void processBroadcastElement(Integer num, BroadcastProcessFunction<Void, Integer, Void>.Context context, Collector<Void> collector) throws Exception {
            context.getBroadcastState(BroadcastStateInputFormatTest.descriptor).put(num, num);
        }

        public /* bridge */ /* synthetic */ void processBroadcastElement(Object obj, BroadcastProcessFunction.Context context, Collector collector) throws Exception {
            processBroadcastElement((Integer) obj, (BroadcastProcessFunction<Void, Integer, Void>.Context) context, (Collector<Void>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, BroadcastProcessFunction.ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
            processElement((Void) obj, (BroadcastProcessFunction<Void, Integer, Void>.ReadOnlyContext) readOnlyContext, (Collector<Void>) collector);
        }
    }

    @Test
    public void testReadBroadcastState() throws Exception {
        TwoInputStreamOperatorTestHarness<Void, Integer, Void> testHarness = getTestHarness();
        Throwable th = null;
        try {
            testHarness.open();
            testHarness.processElement2(new StreamRecord(1));
            testHarness.processElement2(new StreamRecord(2));
            testHarness.processElement2(new StreamRecord(3));
            OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
            OperatorState operatorState = new OperatorState(OperatorIDGenerator.fromUid("uid"), 1, 4);
            operatorState.putState(0, snapshot);
            OperatorStateInputSplit operatorStateInputSplit = new OperatorStateInputSplit(snapshot.getManagedOperatorState(), 0);
            BroadcastStateInputFormat broadcastStateInputFormat = new BroadcastStateInputFormat(operatorState, new Configuration(), (StateBackend) null, descriptor);
            broadcastStateInputFormat.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 0));
            broadcastStateInputFormat.open(operatorStateInputSplit);
            HashMap hashMap = new HashMap(3);
            while (!broadcastStateInputFormat.reachedEnd()) {
                Tuple2 tuple2 = (Tuple2) broadcastStateInputFormat.nextRecord((Object) null);
                hashMap.put(tuple2.f0, tuple2.f1);
            }
            HashMap hashMap2 = new HashMap(3);
            hashMap2.put(1, 1);
            hashMap2.put(2, 2);
            hashMap2.put(3, 3);
            Assert.assertEquals("Failed to read correct list state from state backend", hashMap2, hashMap);
            if (testHarness != null) {
                if (0 == 0) {
                    testHarness.close();
                    return;
                }
                try {
                    testHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testHarness != null) {
                if (0 != 0) {
                    try {
                        testHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testHarness.close();
                }
            }
            throw th3;
        }
    }

    private TwoInputStreamOperatorTestHarness<Void, Integer, Void> getTestHarness() throws Exception {
        return new TwoInputStreamOperatorTestHarness<>(new CoBroadcastWithNonKeyedOperator(new StatefulFunction(), Collections.singletonList(descriptor)));
    }
}
