/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tika.server.core.resource;

import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.UriInfo;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.tika.exception.TikaException;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.async.AsyncProcessor;
import org.apache.tika.pipes.async.OfferLargerThanQueueSize;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.serialization.pipes.JsonFetchEmitTupleList;
import org.apache.tika.server.core.resource.AsyncRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;

@Path(value="/async")
public class AsyncResource {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncResource.class);
    private final AsyncProcessor asyncProcessor;
    private final Set<String> supportedFetchers;
    private final EmitterManager emitterManager;
    long maxQueuePauseMs = 60000L;
    private ArrayBlockingQueue<FetchEmitTuple> queue;

    public AsyncResource(java.nio.file.Path tikaConfigPath, Set<String> supportedFetchers) throws TikaException, IOException, SAXException {
        this.asyncProcessor = new AsyncProcessor(tikaConfigPath);
        this.supportedFetchers = supportedFetchers;
        this.emitterManager = EmitterManager.load(tikaConfigPath);
    }

    public ArrayBlockingQueue<FetchEmitTuple> getFetchEmitQueue(int queueSize) {
        this.queue = new ArrayBlockingQueue(queueSize);
        return this.queue;
    }

    public ArrayBlockingQueue<EmitData> getEmitDataQueue(int size) {
        return new ArrayBlockingQueue<EmitData>(size);
    }

    @POST
    @Produces(value={"application/json"})
    public Map<String, Object> post(InputStream is, @Context HttpHeaders httpHeaders, @Context UriInfo info) throws Exception {
        AsyncRequest request = this.deserializeASyncRequest(is);
        for (FetchEmitTuple t : request.getTuples()) {
            if (!this.supportedFetchers.contains(t.getFetchKey().getFetcherName())) {
                return this.badFetcher(t.getFetchKey());
            }
            if (!this.emitterManager.getSupported().contains(t.getEmitKey().getEmitterName())) {
                return this.badEmitter(t.getEmitKey().getEmitterName());
            }
            ParseContext parseContext = t.getParseContext();
            EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig = parseContext.get(EmbeddedDocumentBytesConfig.class);
            if (embeddedDocumentBytesConfig == null || !embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes() || StringUtils.isAllBlank(embeddedDocumentBytesConfig.getEmitter())) continue;
            String bytesEmitter = embeddedDocumentBytesConfig.getEmitter();
            if (this.emitterManager.getSupported().contains(bytesEmitter)) continue;
            return this.badEmitter(bytesEmitter);
        }
        try {
            boolean offered = this.asyncProcessor.offer(request.getTuples(), this.maxQueuePauseMs);
            if (offered) {
                LOG.info("accepted {} tuples, capacity={}", (Object)request.getTuples().size(), (Object)this.asyncProcessor.getCapacity());
                return this.ok(request.getTuples().size());
            }
            LOG.info("throttling {} tuples, capacity={}", (Object)request.getTuples().size(), (Object)this.asyncProcessor.getCapacity());
            return this.throttle(request.getTuples().size());
        }
        catch (OfferLargerThanQueueSize e) {
            LOG.info("throttling {} tuples, capacity={}", (Object)request.getTuples().size(), (Object)this.asyncProcessor.getCapacity());
            return this.throttle(request.getTuples().size());
        }
    }

    private Map<String, Object> ok(int size) {
        HashMap<String, Object> map = new HashMap<String, Object>();
        map.put("status", "ok");
        map.put("added", size);
        return map;
    }

    private Map<String, Object> throttle(int requestSize) {
        HashMap<String, Object> map = new HashMap<String, Object>();
        map.put("status", "throttled");
        map.put("msg", "not able to receive request of size " + requestSize + " at this time");
        map.put("capacity", this.asyncProcessor.getCapacity());
        return map;
    }

    private Map<String, Object> badEmitter(String emitterName) {
        throw new BadRequestException("can't find emitter for " + emitterName);
    }

    private Map<String, Object> badFetcher(FetchKey fetchKey) {
        throw new BadRequestException("can't find fetcher for " + fetchKey.getFetcherName());
    }

    private AsyncRequest deserializeASyncRequest(InputStream is) throws IOException {
        try (InputStreamReader reader = new InputStreamReader(is, StandardCharsets.UTF_8);){
            AsyncRequest asyncRequest = new AsyncRequest(JsonFetchEmitTupleList.fromJson(reader));
            return asyncRequest;
        }
    }

    public void shutdownNow() throws Exception {
        this.asyncProcessor.close();
    }
}

