package org.apache.beam.runners.spark.structuredstreaming.translation;

import java.util.Collection;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.ExplainMode;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext.class */
public final class EvaluationContext {
    private static final Logger LOG = LoggerFactory.getLogger(EvaluationContext.class);
    private final Collection<? extends NamedDataset<?>> leaves;
    private final SparkSession session;

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/EvaluationContext$NamedDataset.class */
    interface NamedDataset<T> {
        String name();

        @Nullable
        Dataset<WindowedValue<T>> dataset();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EvaluationContext(Collection<? extends NamedDataset<?>> collection, SparkSession sparkSession) {
        this.leaves = collection;
        this.session = sparkSession;
    }

    public void evaluate() {
        for (NamedDataset<?> namedDataset : this.leaves) {
            Dataset<WindowedValue<?>> dataset = namedDataset.dataset();
            if (dataset != null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Evaluating dataset {}:\n{}", namedDataset.name(), dataset.queryExecution().explainString(ExplainMode.fromString("simple")));
                }
                evaluate(namedDataset.name(), dataset);
            }
        }
    }

    public static <T> void evaluate(String str, Dataset<T> dataset) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            dataset.write().mode("overwrite").format("noop").save();
            LOG.info("Evaluated dataset {} in {}", str, durationSince(currentTimeMillis));
        } catch (RuntimeException e) {
            LOG.error("Failed to evaluate dataset {}: {}", str, Throwables.getRootCause(e).getMessage());
            throw new RuntimeException(e);
        }
    }

    public static <T> T[] collect(String str, Dataset<T> dataset) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            T[] tArr = (T[]) ((Object[]) dataset.collect());
            LOG.info("Collected dataset {} in {} [size: {}]", new Object[]{str, durationSince(currentTimeMillis), Integer.valueOf(tArr.length)});
            return tArr;
        } catch (Exception e) {
            LOG.error("Failed to collect dataset {}: {}", str, Throwables.getRootCause(e).getMessage());
            throw new RuntimeException(e);
        }
    }

    public SparkSession getSparkSession() {
        return this.session;
    }

    private static String durationSince(long j) {
        return Utils.msDurationToString(System.currentTimeMillis() - j);
    }
}
