package org.apache.beam.runners.extensions.metrics;

import com.sun.net.httpserver.HttpServer;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.class */
public class MetricsHttpSinkTest {
    private static int port;
    private static List<String> messages = new ArrayList();
    private static HttpServer httpServer;
    private static CountDownLatch countDownLatch;

    @BeforeClass
    public static void beforeClass() throws IOException {
        ServerSocket serverSocket = new ServerSocket(0);
        port = serverSocket.getLocalPort();
        serverSocket.close();
        httpServer = HttpServer.create(new InetSocketAddress(port), 0);
        httpServer.createContext("/").setHandler(httpExchange -> {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpExchange.getRequestBody(), StandardCharsets.UTF_8));
            while (true) {
                try {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        bufferedReader.close();
                        httpExchange.sendResponseHeaders(200, 0L);
                        httpExchange.close();
                        countDownLatch.countDown();
                        return;
                    }
                    messages.add(readLine);
                } catch (Throwable th) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            }
        });
        httpServer.start();
    }

    @Before
    public void before() {
        messages.clear();
    }

    @Test
    public void testWriteMetricsWithCommittedSupported() throws Exception {
        CustomMetricQueryResults customMetricQueryResults = new CustomMetricQueryResults(true);
        MetricsOptions as = PipelineOptionsFactory.create().as(MetricsOptions.class);
        as.setMetricsHttpSinkUrl(String.format("http://localhost:%s", Integer.valueOf(port)));
        MetricsHttpSink metricsHttpSink = new MetricsHttpSink(as);
        countDownLatch = new CountDownLatch(1);
        metricsHttpSink.writeMetrics(customMetricQueryResults);
        countDownLatch.await();
        Assert.assertEquals("Wrong number of messages sent to HTTP server", 1L, messages.size());
        Assert.assertEquals("Wrong messages sent to HTTP server", "{\"counters\":[{\"attempted\":20,\"committed\":10,\"name\":{\"name\":\"n1\",\"namespace\":\"ns1\"},\"step\":\"s1\"}],\"distributions\":[{\"attempted\":{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"committed\":{\"count\":2,\"max\":8,\"mean\":5.0,\"min\":5,\"sum\":10},\"name\":{\"name\":\"n2\",\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":\"1970-01-05T00:04:22.800Z\",\"value\":120},\"committed\":{\"timestamp\":\"1970-01-05T00:04:22.800Z\",\"value\":100},\"name\":{\"name\":\"n3\",\"namespace\":\"ns1\"},\"step\":\"s3\"}]}", messages.get(0));
    }

    @Test
    public void testWriteMetricsWithCommittedUnSupported() throws Exception {
        CustomMetricQueryResults customMetricQueryResults = new CustomMetricQueryResults(false);
        MetricsOptions as = PipelineOptionsFactory.create().as(MetricsOptions.class);
        as.setMetricsHttpSinkUrl(String.format("http://localhost:%s", Integer.valueOf(port)));
        MetricsHttpSink metricsHttpSink = new MetricsHttpSink(as);
        countDownLatch = new CountDownLatch(1);
        metricsHttpSink.writeMetrics(customMetricQueryResults);
        countDownLatch.await();
        Assert.assertEquals("Wrong number of messages sent to HTTP server", 1L, messages.size());
        Assert.assertEquals("Wrong messages sent to HTTP server", "{\"counters\":[{\"attempted\":20,\"name\":{\"name\":\"n1\",\"namespace\":\"ns1\"},\"step\":\"s1\"}],\"distributions\":[{\"attempted\":{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"name\":{\"name\":\"n2\",\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":\"1970-01-05T00:04:22.800Z\",\"value\":120},\"name\":{\"name\":\"n3\",\"namespace\":\"ns1\"},\"step\":\"s3\"}]}", messages.get(0));
    }

    @AfterClass
    public static void after() {
        httpServer.stop(0);
    }
}
