package com.google.cloud.dataflow.sdk.testing;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.values.PBegin;
import com.google.cloud.dataflow.sdk.values.PCollection;
import java.io.IOException;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/testing/VerifyDynamicWorkRebalancing.class */
public class VerifyDynamicWorkRebalancing {
    private static final Logger LOG = LoggerFactory.getLogger(VerifyDynamicWorkRebalancing.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/testing/VerifyDynamicWorkRebalancing$HangOnSentinels.class */
    public static class HangOnSentinels<T> extends DoFn<T, Void> {
        private Collection<T> sentinels;
        private final long nonSentinelSleepMsec;
        public final Aggregator<Integer, Integer> sentinelCounter = createAggregator("sentinels", new Sum.SumIntegerFn());
        public final Aggregator<Integer, Integer> nonSentinelCounter = createAggregator("nonSentinels", new Sum.SumIntegerFn());

        public HangOnSentinels(Collection<T> collection, long j) {
            this.sentinels = collection;
            this.nonSentinelSleepMsec = j;
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn
        public void processElement(DoFn<T, Void>.ProcessContext processContext) {
            if (!this.sentinels.contains(processContext.element())) {
                this.nonSentinelCounter.addValue(1);
                VerifyDynamicWorkRebalancing.sleep(this.nonSentinelSleepMsec);
            } else {
                this.sentinelCounter.addValue(1);
                while (true) {
                    VerifyDynamicWorkRebalancing.LOG.info("Waiting at sentinel {}.", processContext.element());
                    VerifyDynamicWorkRebalancing.sleep(10000L);
                }
            }
        }
    }

    private VerifyDynamicWorkRebalancing() {
    }

    public static <T> void run(PTransform<PBegin, PCollection<T>> pTransform, Collection<T> collection, long j) {
        runWithPipeline(configurePipeline(collection.size(), j), pTransform, collection, j);
    }

    public static <T> void runWithPipeline(Pipeline pipeline, PTransform<PBegin, PCollection<T>> pTransform, Collection<T> collection, long j) {
        int intValue;
        HangOnSentinels hangOnSentinels = new HangOnSentinels(collection, j);
        ((PCollection) pipeline.apply(pTransform)).apply(ParDo.of(hangOnSentinels));
        PipelineResult run = pipeline.run();
        long currentTimeMillis = System.currentTimeMillis();
        do {
            try {
                intValue = ((Integer) Iterables.getOnlyElement(run.getAggregatorValues(hangOnSentinels.sentinelCounter).getValues(), 0)).intValue();
                LOG.info("Seen {} sentinels, {} non-sentinels so far.", Integer.valueOf(intValue), Integer.valueOf(((Integer) Iterables.getOnlyElement(run.getAggregatorValues(hangOnSentinels.nonSentinelCounter).getValues(), 0)).intValue()));
                sleep(1000L);
            } catch (AggregatorRetrievalException e) {
                throw new RuntimeException(e);
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            }
        } while (intValue < collection.size());
        LOG.info("Took {} ms to separate all the sentinels.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        LOG.info("Canceling...");
        ((DataflowPipelineJob) run).cancel();
        LOG.info("Done.");
    }

    private static Pipeline configurePipeline(int i, long j) {
        if (!TestPipeline.isIntegrationTest()) {
            throw new IllegalArgumentException("Unsupported for this runner.");
        }
        Preconditions.checkArgument(j > 100);
        TestDataflowPipelineOptions pipelineOptions = TestPipeline.getPipelineOptions();
        ((DataflowPipelineDebugOptions) pipelineOptions.as(DataflowPipelineDebugOptions.class)).setNumberOfWorkerHarnessThreads(i);
        ((DataflowPipelineWorkerPoolOptions) pipelineOptions.as(DataflowPipelineWorkerPoolOptions.class)).setAutoscalingAlgorithm(DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
        return new TestPipeline(DataflowPipelineRunner.fromOptions((PipelineOptions) pipelineOptions), pipelineOptions);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
