package org.apache.flink.streaming.connectors.influxdb.sink.writer;

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApi;
import com.influxdb.client.write.Point;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.class */
public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
    private static final Logger LOG = LoggerFactory.getLogger(InfluxDBWriter.class);
    private final int bufferSize;
    private final boolean writeCheckpoint;
    private long lastTimestamp = 0;
    private final List<Point> elements;
    private Sink.ProcessingTimeService processingTimerService;
    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
    private final InfluxDBClient influxDBClient;

    public InfluxDBWriter(InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer, Configuration configuration) {
        this.schemaSerializer = influxDBSchemaSerializer;
        this.bufferSize = configuration.getInteger(InfluxDBSinkOptions.WRITE_BUFFER_SIZE);
        this.elements = new ArrayList(this.bufferSize);
        this.writeCheckpoint = configuration.getBoolean(InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT);
        this.influxDBClient = InfluxDBSinkOptions.getInfluxDBClient(configuration);
    }

    public void write(IN in, SinkWriter.Context context) throws IOException {
        if (this.elements.size() == this.bufferSize) {
            LOG.debug("Buffer size reached preparing to write the elements.");
            writeCurrentElements();
            this.elements.clear();
        } else {
            LOG.trace("Adding elements to buffer. Buffer size: {}", Integer.valueOf(this.elements.size()));
            this.elements.add(this.schemaSerializer.serialize(in, context));
            if (context.timestamp() != null) {
                this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp().longValue());
            }
        }
    }

    public List<Long> prepareCommit(boolean z) {
        if (this.lastTimestamp == 0) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(Long.valueOf(this.lastTimestamp));
        return arrayList;
    }

    public List<Point> snapshotState() {
        return this.elements;
    }

    public void close() throws Exception {
        LOG.debug("Preparing to write the elements in InfluxDB.");
        writeCurrentElements();
        LOG.debug("Closing the writer.");
        this.elements.clear();
    }

    public void setProcessingTimerService(Sink.ProcessingTimeService processingTimeService) {
        this.processingTimerService = processingTimeService;
    }

    private void writeCurrentElements() {
        WriteApi writeApi = this.influxDBClient.getWriteApi();
        try {
            writeApi.writePoints(this.elements);
            LOG.debug("Wrote {} data points", Integer.valueOf(this.elements.size()));
            if (writeApi != null) {
                writeApi.close();
            }
        } catch (Throwable th) {
            if (writeApi != null) {
                try {
                    writeApi.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
