package org.apache.flink.streaming.connectors.influxdb;

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSink;
import org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions;
import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBContainer;
import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBTestSerializer;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.util.TestLogger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
/* loaded from: input_file:org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.class */
class InfluxDBSinkIntegrationTestCase extends TestLogger {

    @Container
    public static final InfluxDBContainer<?> influxDBContainer = InfluxDBContainer.createWithDefaultTag();
    private static final List<Long> SOURCE_DATA = Arrays.asList(1L, 2L, 3L);
    private static final List<String> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE = (List) SOURCE_DATA.stream().map(l -> {
        return new InfluxDBTestSerializer().serialize(l, (SinkWriter.Context) null).toLineProtocol();
    }).collect(Collectors.toList());

    InfluxDBSinkIntegrationTestCase() {
    }

    @Test
    void testSinkDataToInfluxDB() throws Exception {
        StreamExecutionEnvironment buildStreamEnv = buildStreamEnv();
        InfluxDBSink build = InfluxDBSink.builder().setInfluxDBSchemaSerializer(new InfluxDBTestSerializer()).setInfluxDBUrl(influxDBContainer.getUrl()).setInfluxDBUsername(InfluxDBContainer.username).setInfluxDBPassword(InfluxDBContainer.password).setInfluxDBBucket(InfluxDBContainer.bucket).setInfluxDBOrganization(InfluxDBContainer.organization).addCheckpointDataPoint(true).build();
        buildStreamEnv.addSource(new FiniteTestSource(SOURCE_DATA), BasicTypeInfo.LONG_TYPE_INFO).sinkTo(build);
        buildStreamEnv.execute();
        InfluxDBClient influxDBClient = InfluxDBSinkOptions.getInfluxDBClient(build.getConfiguration());
        Assertions.assertEquals(queryWrittenData(influxDBClient).size(), EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.size());
        Assertions.assertTrue(queryCheckpoints(influxDBClient).size() >= 4);
    }

    private static StreamExecutionEnvironment buildStreamEnv() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executionEnvironment.setParallelism(1);
        executionEnvironment.enableCheckpointing(100L);
        return executionEnvironment;
    }

    private static List<String> queryWrittenData(InfluxDBClient influxDBClient) {
        ArrayList arrayList = new ArrayList();
        Iterator it = influxDBClient.getQueryApi().query(String.format("from(bucket: \"%s\") |> range(start: -1h) |> filter(fn:(r) => r._measurement == \"test\")", InfluxDBContainer.bucket)).iterator();
        while (it.hasNext()) {
            Iterator it2 = ((FluxTable) it.next()).getRecords().iterator();
            while (it2.hasNext()) {
                arrayList.add(recordToDataPoint((FluxRecord) it2.next()).toLineProtocol());
            }
        }
        return arrayList;
    }

    private static List<String> queryCheckpoints(InfluxDBClient influxDBClient) {
        ArrayList arrayList = new ArrayList();
        Iterator it = influxDBClient.getQueryApi().query(String.format("from(bucket: \"%s\") |> range(start: -1h) |> filter(fn:(r) => r._measurement == \"checkpoint\")", InfluxDBContainer.bucket)).iterator();
        while (it.hasNext()) {
            Iterator it2 = ((FluxTable) it.next()).getRecords().iterator();
            while (it2.hasNext()) {
                arrayList.add(recordToCheckpointDataPoint((FluxRecord) it2.next()).toLineProtocol());
            }
        }
        return arrayList;
    }

    private static Point recordToDataPoint(FluxRecord fluxRecord) {
        Point point = new Point(fluxRecord.getMeasurement());
        point.addTag("longValue", String.valueOf(fluxRecord.getValueByKey("longValue")));
        point.addField((String) Objects.requireNonNull(fluxRecord.getField()), String.valueOf(fluxRecord.getValue()));
        point.time(fluxRecord.getTime(), WritePrecision.NS);
        return point;
    }

    private static Point recordToCheckpointDataPoint(FluxRecord fluxRecord) {
        Point point = new Point(fluxRecord.getMeasurement());
        point.addField((String) Objects.requireNonNull(fluxRecord.getField()), String.valueOf(fluxRecord.getValue()));
        point.time(fluxRecord.getTime(), WritePrecision.NS);
        return point;
    }
}
