package com.google.cloud.dataflow.sdk.testing;

import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Optional;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.class */
public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
    private static final String TENTATIVE_COUNTER = "tentative";
    private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
    private final TestDataflowPipelineOptions options;
    private final DataflowPipelineRunner runner;
    private int expectedNumberOfAssertions = 0;

    TestDataflowPipelineRunner(TestDataflowPipelineOptions testDataflowPipelineOptions) {
        this.options = testDataflowPipelineOptions;
        this.runner = DataflowPipelineRunner.fromOptions((PipelineOptions) testDataflowPipelineOptions);
    }

    public static TestDataflowPipelineRunner fromOptions(PipelineOptions pipelineOptions) {
        return new TestDataflowPipelineRunner((TestDataflowPipelineOptions) pipelineOptions.as(TestDataflowPipelineOptions.class));
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.google.cloud.dataflow.sdk.runners.PipelineRunner
    public DataflowPipelineJob run(Pipeline pipeline) {
        return run(pipeline, this.runner);
    }

    DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner dataflowPipelineRunner) {
        Optional<Boolean> checkForSuccess;
        final MonitoringUtil.PrintHandler printHandler = new MonitoringUtil.PrintHandler(this.options.getJobMessageOutput());
        try {
            final DataflowPipelineJob run = dataflowPipelineRunner.run(pipeline);
            LOG.info("Running Dataflow job {} with {} expected assertions.", run.getJobId(), Integer.valueOf(this.expectedNumberOfAssertions));
            try {
                if (this.options.isStreaming()) {
                    Future submit = this.options.getExecutorService().submit(new Callable<Optional<Boolean>>() { // from class: com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineRunner.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public Optional<Boolean> call() throws Exception {
                            while (true) {
                                try {
                                    Optional<Boolean> checkForSuccess2 = TestDataflowPipelineRunner.this.checkForSuccess(run);
                                    if (checkForSuccess2.isPresent()) {
                                        TestDataflowPipelineRunner.LOG.info("Cancelling Dataflow job {}", run.getJobId());
                                        run.cancel();
                                        return checkForSuccess2;
                                    }
                                    Thread.sleep(10000L);
                                } catch (Throwable th) {
                                    TestDataflowPipelineRunner.LOG.info("Cancelling Dataflow job {}", run.getJobId());
                                    run.cancel();
                                    throw th;
                                }
                            }
                        }
                    });
                    PipelineResult.State waitToFinish = run.waitToFinish(10L, TimeUnit.MINUTES, new MonitoringUtil.JobMessagesHandler() { // from class: com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineRunner.2
                        @Override // com.google.cloud.dataflow.sdk.util.MonitoringUtil.JobMessagesHandler
                        public void process(List<JobMessage> list) {
                            printHandler.process(list);
                            for (JobMessage jobMessage : list) {
                                if (jobMessage.getMessageImportance() != null && jobMessage.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
                                    TestDataflowPipelineRunner.LOG.info("Dataflow job {} threw exception, cancelling. Exception was: {}", run.getJobId(), jobMessage.getMessageText());
                                    try {
                                        run.cancel();
                                    } catch (Exception e) {
                                        throw Throwables.propagate(e);
                                    }
                                }
                            }
                        }
                    });
                    if (waitToFinish == null || waitToFinish == PipelineResult.State.RUNNING) {
                        LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.", run.getJobId());
                        run.cancel();
                    }
                    checkForSuccess = (Optional) submit.get();
                } else {
                    run.waitToFinish(-1L, TimeUnit.SECONDS, printHandler);
                    checkForSuccess = checkForSuccess(run);
                }
                if (!checkForSuccess.isPresent()) {
                    throw new IllegalStateException("The dataflow did not output a success or failure metric.");
                }
                if (checkForSuccess.get().booleanValue()) {
                    return run;
                }
                throw new IllegalStateException("The dataflow failed.");
            } catch (Exception e) {
                Throwables.propagateIfPossible(e);
                throw Throwables.propagate(e);
            }
        } catch (DataflowJobExecutionException e2) {
            throw new IllegalStateException("The dataflow failed.");
        }
    }

    @Override // com.google.cloud.dataflow.sdk.runners.PipelineRunner
    public <OutputT extends POutput, InputT extends PInput> OutputT apply(PTransform<InputT, OutputT> pTransform, InputT inputt) {
        if ((pTransform instanceof DataflowAssert.OneSideInputAssert) || (pTransform instanceof DataflowAssert.TwoSideInputAssert)) {
            this.expectedNumberOfAssertions++;
        }
        return (OutputT) this.runner.apply(pTransform, inputt);
    }

    Optional<Boolean> checkForSuccess(DataflowPipelineJob dataflowPipelineJob) throws IOException {
        PipelineResult.State state = dataflowPipelineJob.getState();
        if (state == PipelineResult.State.FAILED || state == PipelineResult.State.CANCELLED) {
            LOG.info("The pipeline failed");
            return Optional.of(false);
        }
        JobMetrics jobMetrics = (JobMetrics) dataflowPipelineJob.getDataflowClient().projects().jobs().getMetrics(dataflowPipelineJob.getProjectId(), dataflowPipelineJob.getJobId()).execute();
        if (jobMetrics == null || jobMetrics.getMetrics() == null) {
            LOG.warn("Metrics not present for Dataflow job {}.", dataflowPipelineJob.getJobId());
        } else {
            int i = 0;
            int i2 = 0;
            for (MetricUpdate metricUpdate : jobMetrics.getMetrics()) {
                if (metricUpdate.getName() != null && metricUpdate.getName().getContext() != null && metricUpdate.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
                    if ("DataflowAssertSuccess".equals(metricUpdate.getName().getName())) {
                        i += ((BigDecimal) metricUpdate.getScalar()).intValue();
                    } else if ("DataflowAssertFailure".equals(metricUpdate.getName().getName())) {
                        i2 += ((BigDecimal) metricUpdate.getScalar()).intValue();
                    }
                }
            }
            if (i2 > 0) {
                LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of {} expected assertions.", new Object[]{dataflowPipelineJob.getJobId(), Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(this.expectedNumberOfAssertions)});
                return Optional.of(false);
            }
            if (i >= this.expectedNumberOfAssertions) {
                LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of {} expected assertions.", new Object[]{dataflowPipelineJob.getJobId(), Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(this.expectedNumberOfAssertions)});
                return Optional.of(true);
            }
            LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected assertions.", new Object[]{dataflowPipelineJob.getJobId(), Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(this.expectedNumberOfAssertions)});
        }
        return Optional.absent();
    }

    public String toString() {
        String valueOf = String.valueOf(this.options.getAppName());
        return valueOf.length() != 0 ? "TestDataflowPipelineRunner#".concat(valueOf) : new String("TestDataflowPipelineRunner#");
    }
}
