package org.apache.flink.streaming.connectors.influxdb.source.reader;

import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
import org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions;
import org.apache.flink.streaming.connectors.influxdb.source.http.HealthCheckHandler;
import org.apache.flink.streaming.connectors.influxdb.source.http.WriteAPIHandler;
import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.class */
public final class InfluxDBSplitReader implements SplitReader<DataPoint, InfluxDBSplit> {
    private final long enqueueWaitTime;
    private final int maximumLinesPerRequest;
    private final int defaultPort;
    private HttpServer server = null;
    private final FutureCompletingBlockingQueue<List<DataPoint>> ingestionQueue;
    private InfluxDBSplit split;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader$InfluxDBSplitRecords.class */
    private static class InfluxDBSplitRecords implements RecordsWithSplitIds<DataPoint> {
        private final List<DataPoint> records;
        private Iterator<DataPoint> recordIterator;
        private final String splitId;

        private InfluxDBSplitRecords(String str) {
            this.splitId = str;
            this.records = new ArrayList();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean addAll(List<DataPoint> list) {
            return this.records.addAll(list);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void prepareForRead() {
            this.recordIterator = this.records.iterator();
        }

        @Nullable
        public String nextSplit() {
            if (this.recordIterator.hasNext()) {
                return this.splitId;
            }
            return null;
        }

        @Nullable
        /* renamed from: nextRecordFromSplit, reason: merged with bridge method [inline-methods] */
        public DataPoint m11nextRecordFromSplit() {
            if (this.recordIterator.hasNext()) {
                return this.recordIterator.next();
            }
            return null;
        }

        public Set<String> finishedSplits() {
            return Collections.emptySet();
        }
    }

    public InfluxDBSplitReader(Configuration configuration) {
        this.enqueueWaitTime = configuration.getLong(InfluxDBSourceOptions.ENQUEUE_WAIT_TIME);
        this.maximumLinesPerRequest = configuration.getInteger(InfluxDBSourceOptions.MAXIMUM_LINES_PER_REQUEST);
        this.defaultPort = configuration.getInteger(InfluxDBSourceOptions.PORT);
        this.ingestionQueue = new FutureCompletingBlockingQueue<>(configuration.getInteger(InfluxDBSourceOptions.INGEST_QUEUE_CAPACITY));
    }

    public RecordsWithSplitIds<DataPoint> fetch() throws IOException {
        if (this.split == null) {
            return null;
        }
        InfluxDBSplitRecords influxDBSplitRecords = new InfluxDBSplitRecords(this.split.splitId());
        try {
            this.ingestionQueue.getAvailabilityFuture().get();
            List list = (List) this.ingestionQueue.poll();
            if (list == null) {
                influxDBSplitRecords.prepareForRead();
                return influxDBSplitRecords;
            }
            influxDBSplitRecords.addAll(list);
            influxDBSplitRecords.prepareForRead();
            return influxDBSplitRecords;
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException("An exception occurred during fetch", e);
        }
    }

    public void handleSplitsChanges(SplitsChange<InfluxDBSplit> splitsChange) {
        if (splitsChange.splits().isEmpty()) {
            return;
        }
        this.split = (InfluxDBSplit) splitsChange.splits().get(0);
        if (this.server != null) {
            return;
        }
        try {
            this.server = HttpServer.create(new InetSocketAddress(this.defaultPort), 0);
            this.server.createContext("/api/v2/write", new WriteAPIHandler(this.maximumLinesPerRequest, this.ingestionQueue, this.split.splitId().hashCode(), this.enqueueWaitTime));
            this.server.createContext("/health", new HealthCheckHandler());
            this.server.setExecutor((Executor) null);
            this.server.start();
        } catch (IOException e) {
            throw new RuntimeException(String.format("Unable to start HTTP Server on Port %d: %s", Integer.valueOf(this.defaultPort), e.getMessage()));
        }
    }

    public void wakeUp() {
        this.ingestionQueue.notifyAvailable();
    }

    public void close() {
        if (this.server != null) {
            this.server.stop(1);
        }
    }
}
