package org.apache.flink.ml.common.broadcast.operator;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.iteration.config.IterationOptions;
import org.apache.flink.ml.common.broadcast.BroadcastContext;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/ml/common/broadcast/operator/BroadcastVariableReceiverOperatorTest.class */
public class BroadcastVariableReceiverOperatorTest {

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private static final String[] BROADCAST_NAMES = {"source1", "source2"};
    private static final TypeInformation<?>[] TYPE_INFORMATIONS = {BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};

    @Test
    public void test() throws Exception {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain(new BroadcastVariableReceiverOperatorFactory(BROADCAST_NAMES, TYPE_INFORMATIONS), new OperatorID()).build();
        try {
            build.processElement(new StreamRecord(1, 2L), 0);
            build.processElement(new StreamRecord(2, 3L), 0);
            build.processElement(new StreamRecord(3, 2L), 1);
            build.processElement(new StreamRecord(4, 2L), 1);
            build.processElement(new StreamRecord(5, 3L), 1);
            Assert.assertFalse(BroadcastContext.isCacheFinished(new StringBuilder().append(BROADCAST_NAMES[0]).append("-").append(0).toString()) || BroadcastContext.isCacheFinished(new StringBuilder().append(BROADCAST_NAMES[1]).append("-").append(0).toString()));
            build.waitForTaskCompletion();
            List broadcastVariable = BroadcastContext.getBroadcastVariable(BROADCAST_NAMES[0] + "-0");
            List broadcastVariable2 = BroadcastContext.getBroadcastVariable(BROADCAST_NAMES[1] + "-0");
            compareLists(Arrays.asList(1, 2), broadcastVariable);
            compareLists(Arrays.asList(3, 4, 5), broadcastVariable2);
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testVariableCleanedBeforeSnapShot() throws Exception {
        StreamTaskMailboxTestHarness buildUnrestored = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain(new BroadcastVariableReceiverOperatorFactory(new String[]{BROADCAST_NAMES[0]}, new TypeInformation[]{TYPE_INFORMATIONS[0]}), new OperatorID()).buildUnrestored();
        try {
            buildUnrestored.getStreamTask().getEnvironment().getTaskManagerInfo().getConfiguration().set(IterationOptions.DATA_CACHE_PATH, "file://" + this.tempFolder.newFolder().getAbsolutePath());
            buildUnrestored.getStreamTask().restore();
            buildUnrestored.processElement(new StreamRecord(1, 2L), 0);
            buildUnrestored.processElement(new StreamRecord(2, 3L), 0);
            buildUnrestored.endInput();
            BroadcastContext.remove(BROADCAST_NAMES[0] + "-0");
            buildUnrestored.getStreamTask().triggerCheckpointOnBarrier(new CheckpointMetaData(1L, 2L), CheckpointOptions.alignedNoTimeout(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()), new CheckpointMetricsBuilder().setAlignmentDurationNanos(0L).setBytesProcessedDuringAlignment(0L));
            buildUnrestored.waitForTaskCompletion();
            if (buildUnrestored != null) {
                buildUnrestored.close();
            }
        } catch (Throwable th) {
            if (buildUnrestored != null) {
                try {
                    buildUnrestored.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public static void compareLists(List<Integer> list, List<?> list2) {
        int[] array = list2.stream().map(obj -> {
            return (Integer) obj;
        }).mapToInt((v0) -> {
            return v0.intValue();
        }).toArray();
        Arrays.sort(array);
        int[] array2 = list.stream().mapToInt((v0) -> {
            return v0.intValue();
        }).toArray();
        Arrays.sort(array2);
        Assert.assertArrayEquals(array2, array);
    }
}
