package org.apache.flink.streaming.connectors.influxdb.source.http;

import com.sun.net.httpserver.HttpExchange;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.class */
public final class WriteAPIHandler extends Handler {
    private static final Logger LOG = LoggerFactory.getLogger(WriteAPIHandler.class);
    private final int maximumLinesPerRequest;
    private final FutureCompletingBlockingQueue ingestionQueue;
    private final int threadIndex;
    private final long enqueueWaitTime;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler$RequestTooLargeException.class */
    private static class RequestTooLargeException extends RuntimeException {
        RequestTooLargeException(String str) {
            super(str);
        }
    }

    public WriteAPIHandler(int i, FutureCompletingBlockingQueue futureCompletingBlockingQueue, int i2, long j) {
        this.maximumLinesPerRequest = i;
        this.ingestionQueue = futureCompletingBlockingQueue;
        this.threadIndex = i2;
        this.enqueueWaitTime = j;
    }

    public void handle(HttpExchange httpExchange) throws IOException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpExchange.getRequestBody(), StandardCharsets.UTF_8));
        try {
            ArrayList arrayList = new ArrayList();
            int i = 0;
            do {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    if (!((Boolean) CompletableFuture.supplyAsync(() -> {
                        try {
                            return Boolean.valueOf(this.ingestionQueue.put(this.threadIndex, arrayList));
                        } catch (InterruptedException e) {
                            return false;
                        }
                    }).get(this.enqueueWaitTime, TimeUnit.SECONDS)).booleanValue()) {
                        throw new TimeoutException("Failed to enqueue");
                    }
                    httpExchange.sendResponseHeaders(204, -1L);
                    this.ingestionQueue.notifyAvailable();
                    return;
                }
                arrayList.add(InfluxParser.parseToDataPoint(readLine));
                i++;
            } while (i <= this.maximumLinesPerRequest);
            throw new RequestTooLargeException(String.format("Payload too large. Maximum number of lines per request is %d.", Integer.valueOf(this.maximumLinesPerRequest)));
        } catch (InterruptedException | ExecutionException e) {
            Handler.sendResponse(httpExchange, 500, "Server Error");
            LOG.error(e.getMessage());
        } catch (ParseException e2) {
            Handler.sendResponse(httpExchange, 400, e2.getMessage());
        } catch (TimeoutException e3) {
            Handler.sendResponse(httpExchange, 415, "Server overloaded");
            LOG.error(e3.getMessage());
        } catch (RequestTooLargeException e4) {
            Handler.sendResponse(httpExchange, 413, e4.getMessage());
        }
    }
}
