package org.apache.beam.runners.spark;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.stateful.SparkTimerInternals;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.io.FileUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/spark/TestSparkRunner.class */
public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
    private static final Logger LOG = LoggerFactory.getLogger(TestSparkRunner.class);
    private final PipelineOptions options;
    private SparkRunner delegate;

    private TestSparkRunner(PipelineOptions pipelineOptions) {
        this.delegate = SparkRunner.fromOptions(pipelineOptions);
        this.options = pipelineOptions;
    }

    public static TestSparkRunner fromOptions(PipelineOptions pipelineOptions) {
        return new TestSparkRunner(pipelineOptions);
    }

    /* renamed from: run, reason: merged with bridge method [inline-methods] */
    public SparkPipelineResult m255run(Pipeline pipeline) {
        SparkPipelineResult m247run;
        TestSparkPipelineOptions testSparkPipelineOptions = (TestSparkPipelineOptions) PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, this.options);
        boolean isForceStreaming = testSparkPipelineOptions.isForceStreaming();
        AggregatorsAccumulator.clear();
        MetricsAccumulator.clear();
        GlobalWatermarkHolder.clear();
        LOG.info("About to run test pipeline " + this.options.getJobName());
        if (isForceStreaming) {
            try {
                m247run = this.delegate.m247run(pipeline);
                awaitWatermarksOrTimeout(testSparkPipelineOptions, m247run);
                m247run.stop();
                PipelineResult.State state = m247run.getState();
                MatcherAssert.assertThat(String.format("Finish state %s is not allowed.", state), state, Matchers.isOneOf(new PipelineResult.State[]{PipelineResult.State.STOPPED, PipelineResult.State.DONE}));
                try {
                    FileUtils.deleteDirectory(new File(testSparkPipelineOptions.getCheckpointDir()));
                } catch (IOException e) {
                    throw new RuntimeException("Failed to clear checkpoint tmp dir.", e);
                }
            } catch (Throwable th) {
                try {
                    FileUtils.deleteDirectory(new File(testSparkPipelineOptions.getCheckpointDir()));
                    throw th;
                } catch (IOException e2) {
                    throw new RuntimeException("Failed to clear checkpoint tmp dir.", e2);
                }
            }
        } else {
            m247run = this.delegate.m247run(pipeline);
            m247run.waitUntilFinish();
            m247run.stop();
            PipelineResult.State state2 = m247run.getState();
            MatcherAssert.assertThat(String.format("Finish state %s is not allowed.", state2), state2, Matchers.is(PipelineResult.State.DONE));
            MatcherAssert.assertThat(m247run, testSparkPipelineOptions.getOnCreateMatcher());
            MatcherAssert.assertThat(m247run, testSparkPipelineOptions.getOnSuccessMatcher());
        }
        return m247run;
    }

    private static void awaitWatermarksOrTimeout(TestSparkPipelineOptions testSparkPipelineOptions, SparkPipelineResult sparkPipelineResult) {
        Instant currentInputWatermarkTime;
        Long valueOf = Long.valueOf(Duration.standardSeconds(((Long) Preconditions.checkNotNull(testSparkPipelineOptions.getTestTimeoutSeconds())).longValue()).getMillis());
        Long batchIntervalMillis = testSparkPipelineOptions.getBatchIntervalMillis();
        Instant instant = new Instant(testSparkPipelineOptions.getStopPipelineWatermark());
        sparkPipelineResult.waitUntilFinish(Duration.millis(batchIntervalMillis.longValue()));
        do {
            SparkTimerInternals global = SparkTimerInternals.global(GlobalWatermarkHolder.get(batchIntervalMillis));
            global.advanceWatermark();
            currentInputWatermarkTime = global.currentInputWatermarkTime();
            Uninterruptibles.sleepUninterruptibly(batchIntervalMillis.longValue(), TimeUnit.MILLISECONDS);
            Long valueOf2 = Long.valueOf(valueOf.longValue() - batchIntervalMillis.longValue());
            valueOf = valueOf2;
            if (valueOf2.longValue() <= 0) {
                return;
            }
        } while (currentInputWatermarkTime.isBefore(instant));
    }
}
