package org.apache.beam.runners.direct;

import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.repackaged.direct_java.runners.core.metrics.MetricsPusherTest;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ThreadLeakTracker;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.LinkedListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Instant;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;

/* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.class */
public class ExecutorServiceParallelExecutorTest {
    private static final long NUM_ELEMENTS = 1000;
    private final TestPipeline pipeline = TestPipeline.create();
    private final TestRule threadLeakTracker = new ThreadLeakTracker();

    @Rule
    public final TestRule execution = RuleChain.outerRule(this.pipeline).around(this.threadLeakTracker);

    @Rule
    public final TestName testName = new TestName();

    /* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest$CountingDoFn.class */
    private static class CountingDoFn extends DoFn<Long, Long> {
        private final Counter counter;

        private CountingDoFn() {
            this.counter = Metrics.counter(MetricsPusherTest.class, "counter");
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Long, Long>.ProcessContext processContext) {
            try {
                this.counter.inc();
                processContext.output((Long) processContext.element());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Test
    @Ignore("https://github.com/apache/beam/issues/18950 Test reliably fails.")
    public void ensureMetricsThreadDoesntLeak() throws ExecutionException, InterruptedException {
        DirectGraph create = DirectGraph.create(Collections.emptyMap(), Collections.emptyMap(), LinkedListMultimap.create(), Collections.emptySet(), Collections.emptyMap());
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(false).setNameFormat("dontleak_" + getClass().getName() + "#" + this.testName.getMethodName()).build());
        newSingleThreadExecutor.submit(() -> {
        }).get();
        EvaluationContext create2 = EvaluationContext.create(MockClock.fromInstant(Instant.now()), CloningBundleFactory.create(), create, Collections.emptySet(), newSingleThreadExecutor);
        ExecutorServiceParallelExecutor.create(2, TransformEvaluatorRegistry.javaSdkNativeRegistry(create2, PipelineOptionsFactory.create().as(DirectOptions.class)), Collections.emptyMap(), create2, newSingleThreadExecutor).stop();
        try {
            newSingleThreadExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    @Test
    @Ignore("https://github.com/apache/beam/issues/18950 Test reliably fails.")
    public void testNoThreadsLeakInPipelineExecution() {
        this.pipeline.apply(GenerateSequence.from(0L).to(NUM_ELEMENTS)).apply(ParDo.of(new CountingDoFn()));
        this.pipeline.run();
    }
}
