package org.apache.asterix.external.input.record.reader.http;

import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
import org.apache.hyracks.http.server.HttpServer;
import org.apache.hyracks.http.server.HttpServerConfig;
import org.apache.hyracks.http.server.WebManager;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.class */
public class HttpServerRecordReader implements IRecordReader<char[]> {
    public static final Logger LOGGER = LogManager.getLogger();
    private static final String DEFAULT_ENTRY_POINT = "/";
    private static final int DEFAULT_QUEUE_SIZE = 128;
    private LinkedBlockingQueue<String> inputQ;
    private CharArrayRecord record;
    private boolean closed = false;
    private WebManager webManager;
    private HttpServer webServer;

    /* loaded from: input_file:org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader$HttpFeedServlet.class */
    private class HttpFeedServlet extends AbstractServlet {
        private LinkedBlockingQueue<String> inputQ;

        private int splitIntoRecords(String str) throws InterruptedException {
            int i = 0;
            int i2 = 0;
            int i3 = 0;
            boolean z = false;
            char[] charArray = str.toCharArray();
            for (int i4 = 0; i4 < charArray.length; i4++) {
                if (charArray[i4] == '{') {
                    if (!z) {
                        i = i4;
                        z = true;
                    }
                    i2++;
                } else if (charArray[i4] == '}') {
                    i2--;
                }
                if (i2 == 0) {
                    if (z) {
                        this.inputQ.put(str.substring(i, i4 + 1) + "\n");
                        i3++;
                        z = false;
                    }
                    i = i4;
                }
            }
            return i3;
        }

        public HttpFeedServlet(ConcurrentMap<String, Object> concurrentMap, String[] strArr, LinkedBlockingQueue<String> linkedBlockingQueue) {
            super(concurrentMap, strArr);
            this.inputQ = linkedBlockingQueue;
        }

        private int doPost(IServletRequest iServletRequest) throws InterruptedException {
            return splitIntoRecords(iServletRequest.getHttpRequest().content().toString(StandardCharsets.UTF_8));
        }

        public void handle(IServletRequest iServletRequest, IServletResponse iServletResponse) {
            PrintWriter writer = iServletResponse.writer();
            if (iServletRequest.getHttpRequest().method() == HttpMethod.POST) {
                try {
                    writer.write(String.valueOf(doPost(iServletRequest)));
                    iServletResponse.setStatus(HttpResponseStatus.OK);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    HttpServerRecordReader.LOGGER.log(Level.INFO, "exception thrown for {}", iServletRequest, e);
                    iServletResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
                    writer.write(e.toString());
                }
            } else {
                iServletResponse.setStatus(HttpResponseStatus.METHOD_NOT_ALLOWED);
            }
            writer.flush();
        }
    }

    public HttpServerRecordReader(int i, String str, int i2, HttpServerConfig httpServerConfig) throws Exception {
        this.inputQ = new LinkedBlockingQueue<>(i2 > 0 ? i2 : DEFAULT_QUEUE_SIZE);
        this.record = new CharArrayRecord();
        this.webManager = new WebManager();
        this.webServer = new HttpServer(this.webManager.getBosses(), this.webManager.getWorkers(), i, httpServerConfig);
        HttpServer httpServer = this.webServer;
        ConcurrentMap ctx = this.webServer.ctx();
        String[] strArr = new String[1];
        strArr[0] = str == null ? DEFAULT_ENTRY_POINT : str;
        httpServer.addServlet(new HttpFeedServlet(ctx, strArr, this.inputQ));
        this.webManager.add(this.webServer);
        this.webManager.start();
    }

    @Override // org.apache.asterix.external.api.IRecordReader
    public boolean hasNext() {
        return !this.closed;
    }

    @Override // org.apache.asterix.external.api.IRecordReader
    public IRawRecord<char[]> next() throws IOException, InterruptedException {
        String poll = this.inputQ.poll();
        if (poll == null) {
            return null;
        }
        this.record.set(poll);
        return this.record;
    }

    @Override // org.apache.asterix.external.api.IRecordReader
    public boolean stop() {
        try {
            close();
            return true;
        } catch (Exception e) {
            LOGGER.error(e);
            return false;
        }
    }

    @Override // org.apache.asterix.external.api.IRecordReader
    public void setController(AbstractFeedDataFlowController abstractFeedDataFlowController) {
    }

    @Override // org.apache.asterix.external.api.IRecordReader
    public void setFeedLogManager(FeedLogManager feedLogManager) {
    }

    @Override // org.apache.asterix.external.api.IRecordReader
    public boolean handleException(Throwable th) {
        return false;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            if (!this.closed) {
                this.webManager.stop();
                this.closed = true;
            }
        } catch (Exception e) {
            throw new IOException(e);
        }
    }
}
