package org.apache.beam.runners.portability;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.GaugeData;
import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
import org.apache.beam.runners.portability.testing.TestJobService;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/portability/PortableRunnerTest.class */
public class PortableRunnerTest implements Serializable {
    private static final String COUNTER_TYPE = "beam:metrics:sum_int64:v1";
    private static final String DIST_TYPE = "beam:metrics:distribution_int64:v1";
    private static final String GAUGE_TYPE = "beam:metrics:latest_int64:v1";
    private static final String STRING_SET_TYPE = "beam:metrics:set_string:v1";
    private static final String NAMESPACE_LABEL = "NAMESPACE";
    private static final String METRIC_NAME_LABEL = "NAME";
    private static final String STEP_NAME_LABEL = "PTRANSFORM";
    private static final String NAMESPACE = "test";
    private static final String METRIC_NAME = "testMetric";
    private static final String STEP_NAME = "testStep";
    private final PipelineOptions options = createPipelineOptions();

    @Rule
    public transient GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule().setTimeout(10, TimeUnit.SECONDS);

    @Rule
    public transient TestPipeline p = TestPipeline.fromOptions(this.options);
    private static final String ENDPOINT_URL = "foo:3000";
    private static final Endpoints.ApiServiceDescriptor ENDPOINT_DESCRIPTOR = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(ENDPOINT_URL).build();
    private static final Long COUNTER_VALUE = 42L;
    private static final Long GAUGE_VALUE = 64L;
    private static final Set<String> STRING_SET_VALUE = ImmutableSet.of("ab", "cd");
    private static final Instant GAUGE_TIME = GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardSeconds(1));
    private static final Long DIST_SUM = 1000L;
    private static final Long DIST_MIN = 0L;
    private static final Long DIST_MAX = 1000L;
    private static final Long DIST_COUNT = 2L;

    @Test
    public void stagesAndRunsJob() throws Exception {
        createJobServer(JobApi.JobState.Enum.DONE, JobApi.MetricResults.getDefaultInstance());
        MatcherAssert.assertThat(PortableRunner.create(this.options, ManagedChannelFactory.createInProcess()).run(this.p).waitUntilFinish(), Matchers.is(PipelineResult.State.DONE));
    }

    @Test
    public void extractsMetrics() throws Exception {
        createJobServer(JobApi.JobState.Enum.DONE, generateMetricResults());
        PipelineResult run = PortableRunner.create(this.options, ManagedChannelFactory.createInProcess()).run(this.p);
        run.waitUntilFinish();
        MetricQueryResults allMetrics = run.metrics().allMetrics();
        MatcherAssert.assertThat((Long) ((MetricResult) allMetrics.getCounters().iterator().next()).getAttempted(), Matchers.is(COUNTER_VALUE));
        MatcherAssert.assertThat(Long.valueOf(((DistributionResult) ((MetricResult) allMetrics.getDistributions().iterator().next()).getAttempted()).getCount()), Matchers.is(DIST_COUNT));
        MatcherAssert.assertThat(Long.valueOf(((DistributionResult) ((MetricResult) allMetrics.getDistributions().iterator().next()).getAttempted()).getMax()), Matchers.is(DIST_MAX));
        MatcherAssert.assertThat(Long.valueOf(((DistributionResult) ((MetricResult) allMetrics.getDistributions().iterator().next()).getAttempted()).getMin()), Matchers.is(DIST_MIN));
        MatcherAssert.assertThat(Long.valueOf(((DistributionResult) ((MetricResult) allMetrics.getDistributions().iterator().next()).getAttempted()).getSum()), Matchers.is(DIST_SUM));
        MatcherAssert.assertThat(Long.valueOf(((GaugeResult) ((MetricResult) allMetrics.getGauges().iterator().next()).getAttempted()).getValue()), Matchers.is(GAUGE_VALUE));
        MatcherAssert.assertThat(((StringSetResult) ((MetricResult) allMetrics.getStringSets().iterator().next()).getAttempted()).getStringSet(), Matchers.is(STRING_SET_VALUE));
    }

    private JobApi.MetricResults generateMetricResults() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(NAMESPACE_LABEL, NAMESPACE);
        hashMap.put(METRIC_NAME_LABEL, METRIC_NAME);
        hashMap.put(STEP_NAME_LABEL, STEP_NAME);
        MetricsApi.MonitoringInfo build = MetricsApi.MonitoringInfo.newBuilder().setType(COUNTER_TYPE).putAllLabels(hashMap).setPayload(MonitoringInfoEncodings.encodeInt64Counter(COUNTER_VALUE.longValue())).build();
        MetricsApi.MonitoringInfo build2 = MetricsApi.MonitoringInfo.newBuilder().setType(DIST_TYPE).putAllLabels(hashMap).setPayload(MonitoringInfoEncodings.encodeInt64Distribution(DistributionData.create(DIST_SUM.longValue(), DIST_COUNT.longValue(), DIST_MIN.longValue(), DIST_MAX.longValue()))).build();
        MetricsApi.MonitoringInfo build3 = MetricsApi.MonitoringInfo.newBuilder().setType(GAUGE_TYPE).putAllLabels(hashMap).setPayload(MonitoringInfoEncodings.encodeInt64Gauge(GaugeData.create(GAUGE_VALUE.longValue(), GAUGE_TIME))).build();
        return JobApi.MetricResults.newBuilder().addAttempted(build).addAttempted(build2).addAttempted(build3).addAttempted(MetricsApi.MonitoringInfo.newBuilder().setType(STRING_SET_TYPE).putAllLabels(hashMap).setPayload(MonitoringInfoEncodings.encodeStringSet(StringSetData.create(STRING_SET_VALUE))).build()).build();
    }

    private void createJobServer(JobApi.JobState.Enum r11, JobApi.MetricResults metricResults) throws IOException {
        ArtifactStagingService artifactStagingService = new ArtifactStagingService(new ArtifactStagingService.ArtifactDestinationProvider() { // from class: org.apache.beam.runners.portability.PortableRunnerTest.1
            public ArtifactStagingService.ArtifactDestination getDestination(String str, String str2) throws IOException {
                return ArtifactStagingService.ArtifactDestination.create("/dev/null", ByteString.EMPTY, ByteStreams.nullOutputStream());
            }

            public void removeStagedArtifacts(String str) {
            }
        });
        artifactStagingService.registerJob("TestStagingToken", ImmutableMap.of());
        this.grpcCleanupRule.register(InProcessServerBuilder.forName(ENDPOINT_URL).addService(new TestJobService(ENDPOINT_DESCRIPTOR, "prepId", "jobId", r11, metricResults)).addService(artifactStagingService).build()).start();
    }

    private static PipelineOptions createPipelineOptions() {
        PortablePipelineOptions as = PipelineOptionsFactory.create().as(PortablePipelineOptions.class);
        as.setJobEndpoint(ENDPOINT_URL);
        as.setRunner(PortableRunner.class);
        return as;
    }
}
