package org.apache.flink.ml.common.broadcast.operator;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.iteration.config.IterationOptions;
import org.apache.flink.ml.common.broadcast.BroadcastContext;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/ml/common/broadcast/operator/TwoInputBroadcastWrapperOperatorTest.class */
public class TwoInputBroadcastWrapperOperatorTest {

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private static final String[] BROADCAST_NAMES = {"source1", "source2"};
    private static final List<Integer> SOURCE_1 = Collections.singletonList(1);
    private static final List<Integer> SOURCE_2 = Arrays.asList(1, 2, 3);

    /* loaded from: input_file:org/apache/flink/ml/common/broadcast/operator/TwoInputBroadcastWrapperOperatorTest$MyRichFunction.class */
    private static class MyRichFunction extends AbstractRichFunction {
        private MyRichFunction() {
        }
    }

    @Test
    public void testProcessElements() throws Exception {
        StreamTaskMailboxTestHarness buildUnrestored = new StreamTaskMailboxTestHarnessBuilder(TwoInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain(new BroadcastWrapperOperatorFactory(SimpleOperatorFactory.of(new TestTwoInputOp(new MyRichFunction(), BROADCAST_NAMES, Arrays.asList(SOURCE_1, SOURCE_2))), new BroadcastWrapper(BROADCAST_NAMES)), new OperatorID()).buildUnrestored();
        try {
            buildUnrestored.getStreamTask().getEnvironment().getTaskManagerInfo().getConfiguration().set(IterationOptions.DATA_CACHE_PATH, "file://" + this.tempFolder.newFolder().getAbsolutePath());
            buildUnrestored.getStreamTask().restore();
            BroadcastContext.putBroadcastVariable(BROADCAST_NAMES[0] + "-0", Tuple2.of(true, SOURCE_1));
            BroadcastContext.putBroadcastVariable(BROADCAST_NAMES[1] + "-0", Tuple2.of(true, SOURCE_2));
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            for (int i = 0; i < 5; i++) {
                buildUnrestored.processElement(new StreamRecord(Integer.valueOf(i), 1000L), 0);
                buildUnrestored.processElement(new StreamRecord(Integer.valueOf(i), 1000L), 1);
                concurrentLinkedQueue.add(new StreamRecord(Integer.valueOf(i), 1000L));
                concurrentLinkedQueue.add(new StreamRecord(Integer.valueOf(i), 1000L));
            }
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, buildUnrestored.getOutput());
            if (buildUnrestored != null) {
                buildUnrestored.close();
            }
        } catch (Throwable th) {
            if (buildUnrestored != null) {
                try {
                    buildUnrestored.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
