package org.apache.beam.runners.spark.structuredstreaming.metrics;

import org.apache.beam.runners.core.metrics.TestMetricsSink;
import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.UsesMetricsPusher;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsPusherTest.class */
public class MetricsPusherTest {
    private static final Logger LOG = LoggerFactory.getLogger(MetricsPusherTest.class);
    private static final String COUNTER_NAME = "counter";
    private static Pipeline pipeline;

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsPusherTest$CountingDoFn.class */
    private static class CountingDoFn extends DoFn<Integer, Integer> {
        private final Counter counter;

        private CountingDoFn() {
            this.counter = Metrics.counter(MetricsPusherTest.class, MetricsPusherTest.COUNTER_NAME);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
            try {
                this.counter.inc();
                processContext.output((Integer) processContext.element());
            } catch (Exception e) {
                MetricsPusherTest.LOG.warn("Exception caught" + e);
            }
        }
    }

    @BeforeClass
    public static void beforeClass() {
        SparkStructuredStreamingPipelineOptions as = PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
        as.setRunner(SparkStructuredStreamingRunner.class);
        as.setTestMode(true);
        MetricsOptions as2 = as.as(MetricsOptions.class);
        as2.setMetricsSink(TestMetricsSink.class);
        pipeline = Pipeline.create(as2);
    }

    @Test
    @Category({UsesMetricsPusher.class})
    public void testInSBatchMode() throws Exception {
        pipeline.apply(Create.of(1, new Integer[]{2, 3, 4, 5, 6})).apply(ParDo.of(new CountingDoFn()));
        pipeline.run();
        Thread.sleep((pipeline.getOptions().as(MetricsOptions.class).getMetricsPushPeriod().longValue() + 1) * 1000);
        Assert.assertThat(Long.valueOf(TestMetricsSink.getCounterValue(COUNTER_NAME)), Matchers.is(6L));
    }
}
