/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.extensions.metrics;

import com.sun.net.httpserver.HttpServer;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.runners.extensions.metrics.CustomMetricQueryResults;
import org.apache.beam.runners.extensions.metrics.MetricsHttpSink;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class MetricsHttpSinkTest {
    private static int port;
    private static List<String> messages;
    private static HttpServer httpServer;
    private static CountDownLatch countDownLatch;

    @BeforeClass
    public static void beforeClass() throws IOException {
        ServerSocket serverSocket = new ServerSocket(0);
        port = serverSocket.getLocalPort();
        serverSocket.close();
        httpServer = HttpServer.create(new InetSocketAddress(port), 0);
        httpServer.createContext("/").setHandler(httpExchange -> {
            try (BufferedReader in = new BufferedReader(new InputStreamReader(httpExchange.getRequestBody(), StandardCharsets.UTF_8));){
                String line;
                while ((line = in.readLine()) != null) {
                    messages.add(line);
                }
            }
            httpExchange.sendResponseHeaders(200, 0L);
            httpExchange.close();
            countDownLatch.countDown();
        });
        httpServer.start();
    }

    @Before
    public void before() {
        messages.clear();
    }

    @Test
    public void testWriteMetricsWithCommittedSupported() throws Exception {
        CustomMetricQueryResults metricQueryResults = new CustomMetricQueryResults(true);
        MetricsOptions pipelineOptions = (MetricsOptions)PipelineOptionsFactory.create().as(MetricsOptions.class);
        pipelineOptions.setMetricsHttpSinkUrl(String.format("http://localhost:%s", port));
        MetricsHttpSink metricsHttpSink = new MetricsHttpSink(pipelineOptions);
        countDownLatch = new CountDownLatch(1);
        metricsHttpSink.writeMetrics((MetricQueryResults)metricQueryResults);
        countDownLatch.await();
        String expected = "{\"counters\":[{\"attempted\":20,\"committed\":10,\"name\":{\"name\":\"n1\",\"namespace\":\"ns1\"},\"step\":\"s1\"}],\"distributions\":[{\"attempted\":{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"committed\":{\"count\":2,\"max\":8,\"mean\":5.0,\"min\":5,\"sum\":10},\"name\":{\"name\":\"n2\",\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":\"1970-01-05T00:04:22.800Z\",\"value\":120},\"committed\":{\"timestamp\":\"1970-01-05T00:04:22.800Z\",\"value\":100},\"name\":{\"name\":\"n3\",\"namespace\":\"ns1\"},\"step\":\"s3\"}]}";
        Assert.assertEquals((String)"Wrong number of messages sent to HTTP server", (long)1L, (long)messages.size());
        Assert.assertEquals((String)"Wrong messages sent to HTTP server", (Object)expected, (Object)messages.get(0));
    }

    @Test
    public void testWriteMetricsWithCommittedUnSupported() throws Exception {
        CustomMetricQueryResults metricQueryResults = new CustomMetricQueryResults(false);
        MetricsOptions pipelineOptions = (MetricsOptions)PipelineOptionsFactory.create().as(MetricsOptions.class);
        pipelineOptions.setMetricsHttpSinkUrl(String.format("http://localhost:%s", port));
        MetricsHttpSink metricsHttpSink = new MetricsHttpSink(pipelineOptions);
        countDownLatch = new CountDownLatch(1);
        metricsHttpSink.writeMetrics((MetricQueryResults)metricQueryResults);
        countDownLatch.await();
        String expected = "{\"counters\":[{\"attempted\":20,\"name\":{\"name\":\"n1\",\"namespace\":\"ns1\"},\"step\":\"s1\"}],\"distributions\":[{\"attempted\":{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"name\":{\"name\":\"n2\",\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":\"1970-01-05T00:04:22.800Z\",\"value\":120},\"name\":{\"name\":\"n3\",\"namespace\":\"ns1\"},\"step\":\"s3\"}]}";
        Assert.assertEquals((String)"Wrong number of messages sent to HTTP server", (long)1L, (long)messages.size());
        Assert.assertEquals((String)"Wrong messages sent to HTTP server", (Object)expected, (Object)messages.get(0));
    }

    @AfterClass
    public static void after() {
        httpServer.stop(0);
    }

    static {
        messages = new ArrayList<String>();
    }
}

