package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;

import com.google.cloud.Timestamp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOPushDownRule;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.class */
public class BigQueryIOPushDownIT {
    private static final String READ_FROM_TABLE = "apache-beam-testing:beam_performance.hacker_news_full";
    private static final String NAMESPACE = BigQueryIOPushDownIT.class.getName();
    private static final String FIELDS_READ_METRIC = "fields_read";
    private static final String READ_TIME_METRIC = "read_time";
    private static final String CREATE_TABLE_STATEMENT = "CREATE EXTERNAL TABLE HACKER_NEWS( \n   title VARCHAR, \n   url VARCHAR, \n   text VARCHAR, \n   dead BOOLEAN, \n   `by` VARCHAR, \n   score INTEGER, \n   `time` INTEGER, \n   `timestamp` TIMESTAMP, \n   type VARCHAR, \n   id INTEGER, \n   parent INTEGER, \n   descendants INTEGER, \n   ranking INTEGER, \n   deleted BOOLEAN \n) \nTYPE 'bigquery' \nLOCATION 'apache-beam-testing:beam_performance.hacker_news_full' \nTBLPROPERTIES '{ method: \"%s\" }'";
    private static final String SELECT_STATEMENT = "SELECT `by` as author, type, title, score from HACKER_NEWS where (type='story' or type='job') and score>2";
    private static SQLBigQueryPerfTestOptions options;
    private static String metricsBigQueryDataset;
    private static String metricsBigQueryTable;
    private static InfluxDBSettings settings;
    private Pipeline pipeline = Pipeline.create(options);
    private BeamSqlEnv sqlEnv;

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT$SQLBigQueryPerfTestOptions.class */
    public interface SQLBigQueryPerfTestOptions extends IOTestPipelineOptions {
        @Description("BQ dataset for the metrics data")
        String getMetricsBigQueryDataset();

        void setMetricsBigQueryDataset(String str);

        @Description("BQ table for metrics data")
        String getMetricsBigQueryTable();

        void setMetricsBigQueryTable(String str);
    }

    @BeforeClass
    public static void setUp() {
        options = IOITHelper.readIOTestPipelineOptions(SQLBigQueryPerfTestOptions.class);
        metricsBigQueryDataset = options.getMetricsBigQueryDataset();
        metricsBigQueryTable = options.getMetricsBigQueryTable();
        settings = InfluxDBSettings.builder().withHost(options.getInfluxHost()).withDatabase(options.getInfluxDatabase()).withMeasurement(options.getInfluxMeasurement()).get();
    }

    @Before
    public void before() {
        this.sqlEnv = BeamSqlEnv.inMemory(new TableProvider[]{new BigQueryPerfTableProvider(NAMESPACE, FIELDS_READ_METRIC)});
    }

    @Test
    public void readUsingDirectReadMethodPushDown() {
        this.sqlEnv.executeDdl(String.format(CREATE_TABLE_STATEMENT, BigQueryIO.TypedRead.Method.DIRECT_READ.toString()));
        BeamSqlRelUtils.toPCollection(this.pipeline, this.sqlEnv.parseQuery(SELECT_STATEMENT)).apply(ParDo.of(new TimeMonitor(NAMESPACE, READ_TIME_METRIC)));
        PipelineResult run = this.pipeline.run();
        run.waitUntilFinish();
        collectAndPublishMetrics(run, "_directread_pushdown");
    }

    @Test
    public void readUsingDirectReadMethod() {
        ArrayList arrayList = new ArrayList();
        Iterator it = BeamRuleSets.getRuleSets().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((RuleSet) it.next()).iterator();
            Objects.requireNonNull(arrayList);
            it2.forEachRemaining((v1) -> {
                r1.add(v1);
            });
        }
        arrayList.remove(BeamIOPushDownRule.INSTANCE);
        InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
        inMemoryMetaStore.registerProvider(new BigQueryPerfTableProvider(NAMESPACE, FIELDS_READ_METRIC));
        this.sqlEnv = BeamSqlEnv.builder(inMemoryMetaStore).setPipelineOptions(PipelineOptionsFactory.create()).setRuleSets(ImmutableList.of(RuleSets.ofList(arrayList))).build();
        this.sqlEnv.executeDdl(String.format(CREATE_TABLE_STATEMENT, BigQueryIO.TypedRead.Method.DIRECT_READ.toString()));
        BeamSqlRelUtils.toPCollection(this.pipeline, this.sqlEnv.parseQuery(SELECT_STATEMENT)).apply(ParDo.of(new TimeMonitor(NAMESPACE, READ_TIME_METRIC)));
        PipelineResult run = this.pipeline.run();
        run.waitUntilFinish();
        collectAndPublishMetrics(run, "_directread");
    }

    @Test
    public void readUsingDefaultMethod() {
        this.sqlEnv.executeDdl(String.format(CREATE_TABLE_STATEMENT, BigQueryIO.TypedRead.Method.DEFAULT.toString()));
        BeamSqlRelUtils.toPCollection(this.pipeline, this.sqlEnv.parseQuery(SELECT_STATEMENT)).apply(ParDo.of(new TimeMonitor(NAMESPACE, READ_TIME_METRIC)));
        PipelineResult run = this.pipeline.run();
        run.waitUntilFinish();
        collectAndPublishMetrics(run, "_default");
    }

    private void collectAndPublishMetrics(PipelineResult pipelineResult, String str) {
        String uuid = UUID.randomUUID().toString();
        String timestamp = Timestamp.now().toString();
        IOITMetrics iOITMetrics = new IOITMetrics(getReadSuppliers(uuid, timestamp), pipelineResult, NAMESPACE, uuid, timestamp);
        iOITMetrics.publish(metricsBigQueryDataset, metricsBigQueryTable + str);
        iOITMetrics.publishToInflux(settings.copyWithMeasurement(settings.measurement + str));
    }

    private Set<Function<MetricsReader, NamedTestResult>> getReadSuppliers(String str, String str2) {
        HashSet hashSet = new HashSet();
        hashSet.add(metricsReader -> {
            return NamedTestResult.create(str, str2, READ_TIME_METRIC, (metricsReader.getEndTimeMetric(READ_TIME_METRIC) - metricsReader.getStartTimeMetric(READ_TIME_METRIC)) / 1000.0d);
        });
        hashSet.add(metricsReader2 -> {
            return NamedTestResult.create(str, str2, FIELDS_READ_METRIC, metricsReader2.getCounterMetric(FIELDS_READ_METRIC));
        });
        return hashSet;
    }
}
