package org.apache.flink.streaming.connectors.influxdb;

import com.google.api.client.http.ByteArrayContent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpBackOffIOExceptionHandler;
import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.util.ExponentialBackOff;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSource;
import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBTestDeserializer;
import org.apache.flink.util.TestLogger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:org/apache/flink/streaming/connectors/influxdb/InfluxDBSourceIntegrationTestCase.class */
class InfluxDBSourceIntegrationTestCase extends TestLogger {
    private static final String HTTP_ADDRESS = "http://localhost";
    private static final HttpRequestFactory HTTP_REQUEST_FACTORY = new NetHttpTransport().createRequestFactory();
    private static final ExponentialBackOff HTTP_BACKOFF = new ExponentialBackOff.Builder().setInitialIntervalMillis(250).setMaxElapsedTimeMillis(10000).setMaxIntervalMillis(1000).setMultiplier(1.3d).setRandomizationFactor(0.5d).build();
    private int port = 0;
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

    /* loaded from: input_file:org/apache/flink/streaming/connectors/influxdb/InfluxDBSourceIntegrationTestCase$CollectSink.class */
    private static class CollectSink implements SinkFunction<Long> {
        public static final List<Long> VALUES = Collections.synchronizedList(new ArrayList());

        private CollectSink() {
        }

        public void invoke(Long l) {
            VALUES.add(l);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/influxdb/InfluxDBSourceIntegrationTestCase$IncrementMapFunction.class */
    private static class IncrementMapFunction implements MapFunction<Long, Long> {
        private IncrementMapFunction() {
        }

        public Long map(Long l) {
            return Long.valueOf(l.longValue() + 1);
        }
    }

    InfluxDBSourceIntegrationTestCase() {
    }

    @BeforeEach
    void init() {
        CollectSink.VALUES.clear();
        try {
            ServerSocket serverSocket = new ServerSocket(0);
            try {
                this.port = serverSocket.getLocalPort();
                this.log.info("Using port {} for the HTTP server", Integer.valueOf(this.port));
                serverSocket.close();
            } finally {
            }
        } catch (IOException e) {
            this.log.error("Could not open open port {}", e.getMessage());
        }
    }

    @Disabled
    @Test
    void testIncrementPipeline() throws Exception {
        this.env.fromSource(InfluxDBSource.builder().setPort(this.port).setDeserializer(new InfluxDBTestDeserializer()).build(), WatermarkStrategy.noWatermarks(), "InfluxDBSource").map(new IncrementMapFunction()).addSink(new CollectSink());
        JobClient executeAsync = this.env.executeAsync();
        Assertions.assertTrue(checkHealthCheckAvailable());
        Assertions.assertEquals(writeToInfluxDB("test longValue=1i 1\ntest longValue=2i 2"), 204);
        executeAsync.cancel();
        ArrayList arrayList = new ArrayList();
        arrayList.add(2L);
        arrayList.add(3L);
        Assertions.assertTrue(CollectSink.VALUES.containsAll(arrayList));
    }

    @Test
    void testBadRequestException() throws Exception {
        this.env.fromSource(InfluxDBSource.builder().setPort(this.port).setDeserializer(new InfluxDBTestDeserializer()).build(), WatermarkStrategy.noWatermarks(), "InfluxDBSource").map(new IncrementMapFunction()).addSink(new CollectSink());
        JobClient executeAsync = this.env.executeAsync();
        Assertions.assertTrue(checkHealthCheckAvailable());
        Assertions.assertTrue(Assertions.assertThrows(HttpResponseException.class, () -> {
            writeToInfluxDB("malformedLineProtocol_test");
        }).getMessage().contains("Unable to parse line."));
        executeAsync.cancel();
    }

    @Test
    void testRequestTooLargeException() throws Exception {
        this.env.fromSource(InfluxDBSource.builder().setPort(this.port).setDeserializer(new InfluxDBTestDeserializer()).setMaximumLinesPerRequest(2).build(), WatermarkStrategy.noWatermarks(), "InfluxDBSource").map(new IncrementMapFunction()).addSink(new CollectSink());
        JobClient executeAsync = this.env.executeAsync();
        Assertions.assertTrue(checkHealthCheckAvailable());
        Assertions.assertTrue(Assertions.assertThrows(HttpResponseException.class, () -> {
            writeToInfluxDB("test longValue=1i 1\ntest longValue=1i 1\ntest longValue=1i 1");
        }).getMessage().contains("Payload too large. Maximum number of lines per request is 2."));
        executeAsync.cancel();
    }

    private int writeToInfluxDB(String str) throws IOException {
        return HTTP_REQUEST_FACTORY.buildPostRequest(new GenericUrl(String.format("%s:%s/api/v2/write", HTTP_ADDRESS, Integer.valueOf(this.port))), ByteArrayContent.fromString("text/plain; charset=utf-8", str)).execute().getStatusCode();
    }

    private boolean checkHealthCheckAvailable() throws IOException {
        HttpRequest buildGetRequest = HTTP_REQUEST_FACTORY.buildGetRequest(new GenericUrl(String.format("%s:%s/health", HTTP_ADDRESS, Integer.valueOf(this.port))));
        buildGetRequest.setUnsuccessfulResponseHandler(new HttpBackOffUnsuccessfulResponseHandler(HTTP_BACKOFF));
        buildGetRequest.setIOExceptionHandler(new HttpBackOffIOExceptionHandler(HTTP_BACKOFF));
        return buildGetRequest.execute().getStatusCode() == 200;
    }
}
