package org.apache.beam.runners.flink.translation.functions;

import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext;
import org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContextTest.class */
public class FlinkDefaultExecutableStageContextTest {
    private static JobInfo constructJobInfo(String str, long j) {
        PortablePipelineOptions as = PipelineOptionsFactory.as(PortablePipelineOptions.class);
        as.setSdkWorkerParallelism(j);
        return JobInfo.create(str, "job-name", "retrieval-token", PipelineOptionsTranslation.toProto(as));
    }

    @Test
    public void testMultiInstanceFactory() {
        JobInfo constructJobInfo = constructJobInfo("multi-instance-factory-test", 2L);
        ReferenceCountingFlinkExecutableStageContextFactory.WrappedContext wrappedContext = FlinkDefaultExecutableStageContext.MultiInstanceFactory.MULTI_INSTANCE.get(constructJobInfo);
        ReferenceCountingFlinkExecutableStageContextFactory.WrappedContext wrappedContext2 = FlinkDefaultExecutableStageContext.MultiInstanceFactory.MULTI_INSTANCE.get(constructJobInfo);
        ReferenceCountingFlinkExecutableStageContextFactory.WrappedContext wrappedContext3 = FlinkDefaultExecutableStageContext.MultiInstanceFactory.MULTI_INSTANCE.get(constructJobInfo);
        Assert.assertNotEquals("We should create two different factories", wrappedContext.context, wrappedContext2.context);
        Assert.assertEquals("Future calls should be round-robbined to those two factories", wrappedContext.context, wrappedContext3.context);
    }

    @Test
    public void testDefault() {
        JobInfo constructJobInfo = constructJobInfo("default-test", 0L);
        int max = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
        ReferenceCountingFlinkExecutableStageContextFactory.WrappedContext wrappedContext = FlinkDefaultExecutableStageContext.MultiInstanceFactory.MULTI_INSTANCE.get(constructJobInfo);
        for (int i = 1; i < max; i++) {
            Assert.assertNotEquals("We should create " + max + " different factories", wrappedContext.context, FlinkDefaultExecutableStageContext.MultiInstanceFactory.MULTI_INSTANCE.get(constructJobInfo).context);
        }
        Assert.assertEquals("Future calls should be round-robbined to those", wrappedContext.context, FlinkDefaultExecutableStageContext.MultiInstanceFactory.MULTI_INSTANCE.get(constructJobInfo).context);
    }
}
