/*
 * Decompiled with CFR 0.152.
 */
package de.julielab.genemapper.resources;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.inject.Guice;
import com.google.inject.Injector;
import de.julielab.geneexpbase.configuration.Parameters;
import de.julielab.geneexpbase.data.DocumentLoader;
import de.julielab.geneexpbase.genemodel.GeneDocument;
import de.julielab.geneexpbase.genemodel.GeneOrthologs;
import de.julielab.geneexpbase.ioc.ServicesShutdownHub;
import de.julielab.genemapper.Configuration;
import de.julielab.genemapper.GeneMapper;
import de.julielab.genemapper.classification.TransformerDisambiguationDataUtils;
import de.julielab.genemapper.genemodel.GeneDocumentFactory;
import de.julielab.genemapper.ioc.GeneMappingModule;
import de.julielab.java.utilities.FileUtilities;
import de.julielab.java.utilities.ProgressBar;
import de.julielab.jcore.ae.checkpoint.DBCheckpointAE;
import de.julielab.jcore.types.EntityMention;
import de.julielab.jcore.types.Gene;
import de.julielab.jcore.utility.JCoReTools;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.lucene.search.BooleanQuery;
import org.apache.uima.UIMAException;
import org.apache.uima.analysis_engine.AnalysisEngine;
import org.apache.uima.analysis_engine.AnalysisEngineDescription;
import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
import org.apache.uima.analysis_engine.CasIterator;
import org.apache.uima.cas.CAS;
import org.apache.uima.collection.CollectionReader;
import org.apache.uima.fit.factory.AnalysisEngineFactory;
import org.apache.uima.fit.factory.CollectionReaderFactory;
import org.apache.uima.fit.factory.TypeSystemDescriptionFactory;
import org.apache.uima.jcas.JCas;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.impl.ResourceManager_impl;
import org.apache.uima.resource.metadata.TypeSystemDescription;
import org.apache.uima.resource.metadata.impl.ProcessingResourceMetaData_impl;
import org.apache.uima.util.CasPool;
import org.apache.uima.util.InvalidXMLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransformerDisambiguationGene2PubmedDataWriter {
    private static final Logger log = LoggerFactory.getLogger(TransformerDisambiguationGene2PubmedDataWriter.class);
    private static final BlockingDeque<GeneDocument> documentBuffer = new LinkedBlockingDeque<GeneDocument>(512);
    private final List<WritingThread> writingThreads = new ArrayList<WritingThread>();
    private static Injector injector;
    private static Configuration configuration;
    private boolean errorOccurred;

    public static void main(String[] args) throws Exception {
        configuration = new Configuration(new File("configurations/genemapper_gene2pubmed.properties"));
        log.info("Detected {} CPUs. Using this number minus 2.", (Object)Runtime.getRuntime().availableProcessors());
        int numThreads = Math.max(1, Runtime.getRuntime().availableProcessors() - 2);
        String costosysConfigurationFile = "../jcore-gene-mapper-resources/src/main/resources/costosys.xml";
        String gene2pubmedDocumentTableName = "geno.gene2pubmed";
        injector = Guice.createInjector(new GeneMappingModule(configuration));
        String goldTaxMode = "goldTax";
        File gene2pubmed = new File("../jcore-gene-mapper-resources/gene2pubmed.gz");
        String outputFile = "transformerDisambiguationData-gene2pubmed-v23-" + goldTaxMode + "-%s.tsv.gz";
        TransformerDisambiguationGene2PubmedDataWriter dataWriter = new TransformerDisambiguationGene2PubmedDataWriter();
        dataWriter.createDisambiguationData(gene2pubmed, costosysConfigurationFile, gene2pubmedDocumentTableName, injector.getInstance(GeneMapper.class), outputFile, numThreads);
        log.info("Shutting down gene mapper services.");
        log.info("Application finished.");
        injector.getInstance(ServicesShutdownHub.class).shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void createDisambiguationData(File gene2pubmed, String costosysConfigurationFile, String tableName, GeneMapper geneMapper, String outputFile, int numThreads) throws IOException, UIMAException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
        try {
            Multimap<String, String> pubmed2gene = this.readGene2pubmed(gene2pubmed);
            int retrievalBatchSize = 50;
            CollectionReader dbReader = CollectionReaderFactory.createReader("de.julielab.jcore.reader.xmi.desc.jcore-xmi-db-multiplier-reader", "CostosysConfigFile", costosysConfigurationFile, "AnnotationsToLoad", new String[]{"de.julielab.jcore.types.Sentence", "de.julielab.jcore.types.Token", "de.julielab.jcore.types.PennBioIEPOSTag", "de.julielab.jcore.types.Organism", "de.julielab.jcore.types.Abbreviation", "flair:de.julielab.jcore.types.Gene"}, "ReadsBaseDocument", true, "Table", tableName, "BatchSize", retrievalBatchSize, "ResetTable", true);
            TypeSystemDescription tsd = TypeSystemDescriptionFactory.createTypeSystemDescription("de.julielab.jcore.types.jcore-morpho-syntax-types", "de.julielab.jcore.types.jcore-document-structure-pubmed-types", "de.julielab.jcore.types.jcore-document-meta-pubmed-types", "de.julielab.jcore.ae.genemapper.desc.ProteinOffsetExpansionTypeSystem", "de.julielab.jcore.types.extensions.jcore-document-meta-extension-types", "de.julielab.jcore.types.casmultiplier.jcore-dbtable-multiplier-types");
            ArrayList<AnalysisEngine> multipliers = new ArrayList<AnalysisEngine>(numThreads);
            ArrayList<AnalysisEngine> offsetExpanders = new ArrayList<AnalysisEngine>(numThreads);
            ArrayList<AnalysisEngine> proteinMergers = new ArrayList<AnalysisEngine>(numThreads);
            ArrayList<AnalysisEngine> consistencyAes = new ArrayList<AnalysisEngine>(numThreads);
            ArrayList<AnalysisEngine> checkpointConsumers = new ArrayList<AnalysisEngine>(numThreads);
            for (int i = 0; i < numThreads; ++i) {
                AnalysisEngine multiplier = this.createEngineWithTs("de.julielab.jcore.reader.xmi.desc.jcore-xmi-db-multiplier", tsd);
                AnalysisEngine proteinOffsetExpansion = this.createEngineWithTs("de.julielab.jcore.ae.genemapper.desc.ProteinOffsetExpansionEngine", tsd);
                AnalysisEngine extendedProteinsMerger = this.createEngineWithTs("de.julielab.jcore.ae.genemapper.desc.jcore-extended-proteins-merger", tsd);
                AnalysisEngine consistencyAe = this.createEngineWithTs("de.julielab.jcore.ae.genemapper.desc.jcore-protein-consistency-tagger", tsd);
                AnalysisEngine checkpointAe = AnalysisEngineFactory.createEngine(DBCheckpointAE.class, tsd, "CheckpointName", "end", "CostosysConfigFile", costosysConfigurationFile, "IndicateFinished", true);
                multipliers.add(multiplier);
                offsetExpanders.add(proteinOffsetExpansion);
                proteinMergers.add(extendedProteinsMerger);
                consistencyAes.add(consistencyAe);
                checkpointConsumers.add(checkpointAe);
            }
            ProcessingResourceMetaData_impl metaData = new ProcessingResourceMetaData_impl();
            metaData.setTypeSystem(tsd);
            CasPool casPool = new CasPool(numThreads + 4, metaData, new ResourceManager_impl());
            ConcurrentHashMap threadIds = new ConcurrentHashMap(numThreads);
            this.createWritingThreads(geneMapper, outputFile, numThreads);
            ProgressBar progressBar = new ProgressBar(dbReader.getProgress()[0].getTotal() / (long)retrievalBatchSize, 80, true);
            while (dbReader.hasNext() && !this.errorOccurred) {
                log.debug("Getting CAS. Available CASes: {}", (Object)casPool.getNumAvailable());
                CAS cas = casPool.getCas(1200000L);
                if (cas == null) continue;
                dbReader.getNext(cas);
                executorService.submit(() -> {
                    String lastDocId = null;
                    Integer id = threadIds.compute(Thread.currentThread(), (k, v) -> v != null ? v.intValue() : threadIds.size());
                    try {
                        CasIterator casIterator = ((AnalysisEngine)multipliers.get(id)).processAndOutputNewCASes(cas);
                        while (casIterator.hasNext()) {
                            CAS innerCas = casIterator.next();
                            lastDocId = JCoReTools.getDocId(innerCas.getJCas());
                            ((AnalysisEngine)offsetExpanders.get(id)).process(innerCas);
                            ((AnalysisEngine)proteinMergers.get(id)).process(innerCas);
                            ((AnalysisEngine)consistencyAes.get(id)).process(innerCas);
                            this.writeTrainingData(pubmed2gene, innerCas.getJCas(), geneMapper, 256);
                            ((AnalysisEngine)checkpointConsumers.get(id)).process(innerCas);
                            innerCas.release();
                        }
                        casPool.releaseCas(cas);
                        this.callBatchProcessingComplete(offsetExpanders);
                        this.callBatchProcessingComplete(proteinMergers);
                        this.callBatchProcessingComplete(consistencyAes);
                    }
                    catch (ClassCastException | AnalysisEngineProcessException e) {
                        log.warn("Got {} exception for document '{}': {}. Assuming that this is a JeDIS (de-)serialization issue, skipping the document.", e.getClass().getCanonicalName(), lastDocId, e.getMessage());
                    }
                    catch (Throwable t) {
                        log.error("Could not process batch of CASes with Thread ID {} (name: {}) due to exception. The last seen document ID was '{}'.", id, Thread.currentThread().getName(), lastDocId, t);
                        this.errorOccurred = true;
                    }
                });
                progressBar.incrementDone();
                progressBar.printProgressBar();
            }
            for (AnalysisEngine checkpointConsumer : checkpointConsumers) {
                checkpointConsumer.collectionProcessComplete();
            }
            log.info("Shutting down ExecutorService.");
            executorService.shutdown();
            log.info("Waiting 15 minutes for all processing threads to finish.");
            executorService.awaitTermination(15L, TimeUnit.MINUTES);
            Iterator<WritingThread> iterator = documentBuffer;
            synchronized (iterator) {
                log.info("Processing threads have finished, signaling the to-disc-writing threads to finish.");
                this.writingThreads.forEach(WritingThread::finish);
                log.info("Notifying the to-disc-writing thread to continue a last writing iteration.");
                documentBuffer.notifyAll();
            }
            log.info("Waiting for last data to be written to disc...");
            for (WritingThread writingThread : this.writingThreads) {
                writingThread.join();
            }
            log.info("WritingThreads have terminated.");
            log.info("Merging written files into {}", (Object)String.format(outputFile, "all"));
            this.mergeFiles(outputFile);
            log.info("Merging done, application finished.");
        }
        finally {
            executorService.shutdown();
        }
        if (this.errorOccurred) {
            log.error("Early termination due to error. Check the log messages above.");
        }
    }

    private void callBatchProcessingComplete(List<AnalysisEngine> aes) throws AnalysisEngineProcessException {
        for (AnalysisEngine ae : aes) {
            ae.collectionProcessComplete();
        }
    }

    private void mergeFiles(String outputFile) throws IOException {
        try (BufferedOutputStream bos = FileUtilities.getOutputStreamToFile(new File(String.format(outputFile, "all")));){
            for (WritingThread wt : this.writingThreads) {
                IOUtils.copy((Reader)FileUtilities.getReaderFromFile(wt.getOutputFile()), (OutputStream)bos, StandardCharsets.UTF_8);
                wt.getOutputFile().delete();
            }
        }
    }

    public void createWritingThreads(GeneMapper geneMapper, String outputFile, int numThreads) throws IOException {
        for (int i = 0; i < numThreads; ++i) {
            WritingThread writingThread = new WritingThread(geneMapper, new File(String.format(outputFile, i)));
            writingThread.setName("WritingThread-" + i);
            writingThread.start();
            this.writingThreads.add(writingThread);
        }
    }

    private Multimap<String, String> readGene2pubmed(File gene2pubmed) throws IOException {
        HashMultimap<String, String> pubmed2gene = HashMultimap.create();
        try (BufferedReader br = FileUtilities.getReaderFromFile(gene2pubmed);){
            br.lines().filter(Predicate.not(l -> l.startsWith("#"))).map(l -> l.split("\t")).forEach(s2 -> pubmed2gene.put(s2[2], s2[1]));
        }
        return pubmed2gene;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeTrainingData(Multimap<String, String> pubmed2gene, JCas jCas, GeneMapper geneMapper, int contextWindowSize) throws AnalysisEngineProcessException, InterruptedException {
        Matcher specificTypeMatcher = Pattern.compile("Gene|protein|protein_complex|protein_enum|protein_familiy_or_group").matcher("");
        Map<String, Matcher> entityMappingTypes = Map.of(Gene.class.getCanonicalName(), specificTypeMatcher);
        Function<EntityMention, Pair<String, BooleanQuery>> contextFun = em -> ImmutablePair.nullPair();
        GeneDocument document = injector.getInstance(GeneDocumentFactory.class).createGeneDocument(jCas, entityMappingTypes, contextFun, new Parameters(configuration));
        TransformerDisambiguationDataUtils.addDocumentLevelGeneAnnotations(document, pubmed2gene);
        injector.getInstance(DocumentLoader.class).inferDocumentLevelLabelsToMentions(document, document.getGoldIds(), geneMapper.getMappingCore().getCandidateRetrieval(), injector.getInstance(GeneOrthologs.class), false);
        document.setCompletelyAnnotated(false);
        do {
            BlockingDeque<GeneDocument> blockingDeque = documentBuffer;
            synchronized (blockingDeque) {
                if (documentBuffer.remainingCapacity() == 0) {
                    log.trace("Notifying writing thread that buffer is full.");
                    documentBuffer.notify();
                }
            }
        } while (!documentBuffer.offer(document, 1L, TimeUnit.MINUTES));
    }

    private AnalysisEngine createEngineWithTs(String descriptorPath, TypeSystemDescription tsd) throws IOException, InvalidXMLException, ResourceInitializationException {
        AnalysisEngineDescription desc = AnalysisEngineFactory.createEngineDescription(descriptorPath, new Object[0]);
        desc.getAnalysisEngineMetaData().setTypeSystem(tsd);
        return AnalysisEngineFactory.createEngine(desc, new Object[0]);
    }

    private class WritingThread
    extends Thread {
        private final GeneMapper geneMapper;
        private final File outputFile;
        private boolean finish = false;
        private final BufferedWriter bw;

        public WritingThread(GeneMapper geneMapper, File outputFile) throws IOException {
            this.geneMapper = geneMapper;
            this.outputFile = outputFile;
            if (outputFile.exists()) {
                outputFile.delete();
            }
            this.bw = FileUtilities.getWriterToFile(outputFile);
        }

        public File getOutputFile() {
            return this.outputFile;
        }

        public void finish() {
            this.finish = true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                while (!this.finish || !documentBuffer.isEmpty()) {
                    BlockingDeque<GeneDocument> blockingDeque = documentBuffer;
                    synchronized (blockingDeque) {
                        if (documentBuffer.remainingCapacity() > 500 && !this.finish) {
                            log.trace("Waiting for notification.");
                            documentBuffer.wait();
                        }
                    }
                    log.debug("Draining document buffer of size {} to outbound list.", (Object)documentBuffer.size());
                    ArrayList documents = new ArrayList(documentBuffer.size());
                    documentBuffer.drainTo(documents);
                    log.debug("Writing document buffer to file {}", (Object)this.outputFile);
                    for (GeneDocument document : documents) {
                        TransformerDisambiguationDataUtils.writeData(this.bw, this.geneMapper, document);
                    }
                    log.debug("Writing finished.");
                }
            }
            catch (Throwable e) {
                log.error("Error in the data writing thread.", e);
                throw new RuntimeException(e);
            }
            finally {
                if (this.bw != null) {
                    try {
                        this.bw.close();
                    }
                    catch (IOException e) {
                        log.error("Could not close writer to {}", (Object)this.outputFile);
                    }
                }
            }
        }
    }
}

