/*
 * Decompiled with CFR 0.152.
 */
package de.julielab.jcore.reader.db;

import de.julielab.jcore.reader.db.DBSubsetReader;
import de.julielab.jcore.types.casmultiplier.RowBatch;
import de.julielab.xmlData.dataBase.CoStoSysConnection;
import de.julielab.xmlData.dataBase.DBCIterator;
import de.julielab.xmlData.dataBase.util.TableSchemaMismatchException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.uima.UimaContext;
import org.apache.uima.cas.FeatureStructure;
import org.apache.uima.collection.CollectionException;
import org.apache.uima.ducc.Workitem;
import org.apache.uima.fit.descriptor.ConfigurationParameter;
import org.apache.uima.fit.descriptor.ResourceMetaData;
import org.apache.uima.jcas.JCas;
import org.apache.uima.jcas.cas.FSArray;
import org.apache.uima.jcas.cas.StringArray;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.util.Progress;
import org.apache.uima.util.ProgressImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ResourceMetaData(name="JCoRe Database Multiplier Reader", description="A collection reader that receives the IDs of documents from a database table. Additional tables may be specified which will, together with the IDs, be sent to a CAS multiplier extending the DBMultiplierReader. The multiplier will read documents and the joined additional tables according to the list of document IDs sent by this reader. The component leverages the corpus storage system (CoStoSys) for this purpose and is part of the Jena Document Information System, JeDIS.", vendor="JULIE Lab Jena, Germany", copyright="JULIE Lab Jena, Germany")
public class DBMultiplierReader
extends DBSubsetReader {
    public static final String PARAM_RESET_TABLE = "ResetTable";
    public static final String PARAM_TABLE = "Table";
    public static final String PARAM_COSTOSYS_CONFIG_NAME = "CostosysConfigFile";
    public static final String PARAM_DATA_TIMESTAMP = "Timestamp";
    public static final String PARAM_ADDITIONAL_TABLES = "AdditionalTables";
    public static final String PARAM_ADDITIONAL_TABLE_SCHEMAS = "AdditionalTableSchemas";
    public static final String PARAM_FETCH_IDS_PROACTIVELY = "FetchIdsProactively";
    public static final String PARAM_SEND_CAS_TO_LAST = "SendCasToLast";
    @ConfigurationParameter(name="SendCasToLast", mandatory=false, defaultValue={"false"}, description="UIMA DUCC relevant parameter when using a CAS multiplier. When set to true, the worker CAS from the collection reader is forwarded to the last component in the pipeline. This can be used to send information about the progress to the CAS consumer in order to have it perform batch operations. For this purpose, a feature structure of type WorkItem from the DUCC library is added to the worker CAS. This feature structure has information about the current progress.")
    private boolean sendCasToLast;
    private static final Logger log = LoggerFactory.getLogger(DBMultiplierReader.class);
    private RetrievingThread retriever;
    private DBCIterator<Object[]> dataTableDocumentIds;

    @Override
    public void initialize(UimaContext context) throws ResourceInitializationException {
        super.initialize(context);
        if (this.readDataTable.booleanValue()) {
            log.debug("Reading from data table {}", (Object)this.tableName);
            this.dataTableDocumentIds = this.dbc.query(this.tableName, Arrays.asList(this.dbc.getFieldConfiguration(this.dbc.getActiveTableSchema()).getPrimaryKey()));
            this.hasNext = this.dataTableDocumentIds.hasNext();
        } else {
            log.debug("Reading from subset table {}", (Object)this.tableName);
            this.hasNext = this.dbc.withConnectionQueryBoolean(c -> c.hasUnfetchedRows(this.tableName));
        }
    }

    public void getNext(JCas jCas) throws CollectionException {
        log.trace("Requesting next batch of document IDs from the database.");
        List<Object[]> idList = this.getNextDocumentIdBatch();
        log.trace("Received a list of {} ID from the database.", (Object)idList.size());
        RowBatch rowbatch = new RowBatch(jCas);
        FSArray ids = new FSArray(jCas, idList.size());
        for (int i = 0; i < idList.size(); ++i) {
            Object[] objects = idList.get(i);
            StringArray keys = new StringArray(jCas, objects.length);
            for (int j = 0; j < objects.length; ++j) {
                Object object = objects[j];
                keys.set(j, String.valueOf(object));
            }
            ids.set(i, (FeatureStructure)keys);
        }
        StringArray tableArray = new StringArray(jCas, this.tables.length);
        StringArray schemaArray = new StringArray(jCas, this.schemas.length);
        for (int i = 0; i < this.tables.length; ++i) {
            String table = this.tables[i];
            String schema = this.schemas[i];
            tableArray.set(i, table);
            schemaArray.set(i, schema);
        }
        rowbatch.setIdentifiers(ids);
        rowbatch.setTables(tableArray);
        rowbatch.setTableSchemas(schemaArray);
        rowbatch.setCostosysConfiguration(this.costosysConfig);
        rowbatch.addToIndexes();
        if (this.sendCasToLast) {
            try {
                Workitem workitem = new Workitem(jCas);
                workitem.setSendToLast(true);
                workitem.setBlockindex(this.processedDocuments / this.batchSize);
                if (!this.hasNext()) {
                    workitem.setLastBlock(true);
                }
                workitem.addToIndexes();
            }
            catch (IOException e) {
                log.error("Error occurred while creating Workitem feature structure", (Throwable)e);
                throw new CollectionException((Throwable)e);
            }
        }
    }

    public boolean hasNext() throws IOException, CollectionException {
        boolean hasNext = this.hasNext;
        if (this.retriever != null) {
            boolean bl = hasNext = !this.retriever.getDocumentIds().isEmpty();
        }
        if (!hasNext) {
            this.close();
        }
        return hasNext;
    }

    public List<Object[]> getNextDocumentIdBatch() {
        List<Object[]> next = this.readDataTable != false ? this.getNextFromDataTable() : this.getNextFromSubset();
        if (next != null) {
            this.processedDocuments += next.size();
        }
        return next;
    }

    private List<Object[]> getNextFromDataTable() {
        ArrayList<Object[]> next = new ArrayList<Object[]>(this.batchSize);
        this.hasNext = false;
        log.trace("Filling document ID list with the next batch of documents.");
        while (this.dataTableDocumentIds.hasNext() && next.size() < this.batchSize) {
            next.add((Object[])this.dataTableDocumentIds.next());
        }
        if (this.processedDocuments < this.totalDocumentCount - 1) {
            log.trace("Checking if there are more documents to read from the data table.");
            this.hasNext = this.dataTableDocumentIds.hasNext();
        }
        return next;
    }

    private List<Object[]> getNextFromSubset() {
        if (this.retriever == null) {
            this.retriever = new RetrievingThread();
        }
        List<Object[]> idList = this.retriever.getDocumentIds();
        if (this.fetchIdsProactively.booleanValue()) {
            this.retriever = new RetrievingThread();
        }
        return idList;
    }

    public Progress[] getProgress() {
        return new Progress[]{new ProgressImpl(this.processedDocuments, this.totalDocumentCount, "entities", true)};
    }

    public void close() {
        if (this.dbc != null) {
            this.dbc.close();
        }
        this.dbc = null;
    }

    protected class RetrievingThread
    extends Thread {
        private List<Object[]> ids;

        public RetrievingThread() {
            if (DBMultiplierReader.this.fetchIdsProactively.booleanValue()) {
                log.debug("Fetching ID batches in a background thread.");
                this.setName(DBMultiplierReader.class.getSimpleName() + " RetrievingThread (" + this.getName() + ")");
                this.start();
            }
        }

        @Override
        public void run() {
            int limit = Math.min(DBMultiplierReader.this.batchSize, DBMultiplierReader.this.totalDocumentCount - DBMultiplierReader.this.numberFetchedDocIDs);
            try {
                CoStoSysConnection ignored = DBMultiplierReader.this.dbc.obtainOrReserveConnection();
                Object object = null;
                try {
                    log.trace("Using connection {} to retrieveAndMark", (Object)ignored.getConnection());
                    this.ids = DBMultiplierReader.this.dbc.retrieveAndMark(DBMultiplierReader.this.tableName, this.getClass().getSimpleName(), DBMultiplierReader.this.hostName, DBMultiplierReader.this.pid, limit, DBMultiplierReader.this.selectionOrder);
                }
                catch (Throwable throwable) {
                    object = throwable;
                    throw throwable;
                }
                finally {
                    if (ignored != null) {
                        if (object != null) {
                            try {
                                ignored.close();
                            }
                            catch (Throwable throwable) {
                                ((Throwable)object).addSuppressed(throwable);
                            }
                        } else {
                            ignored.close();
                        }
                    }
                }
                if (log.isTraceEnabled()) {
                    ArrayList<String> idStrings = new ArrayList<String>();
                    for (Object[] o : this.ids) {
                        ArrayList<String> pkElements = new ArrayList<String>();
                        for (Object object2 : o) {
                            pkElements.add(String.valueOf(object2));
                        }
                        idStrings.add(StringUtils.join(pkElements, (String)"-"));
                    }
                    log.trace("Reserved the following document IDs for processing: " + idStrings);
                }
                DBMultiplierReader.this.numberFetchedDocIDs += this.ids.size();
                log.debug("Retrieved {} document IDs to fetch from the database.", (Object)this.ids.size());
            }
            catch (TableSchemaMismatchException e) {
                log.error("Table schema mismatch: The active table schema {} specified in the CoStoSys configuration file {} does not match the columns in the subset table {}: {}", new Object[]{DBMultiplierReader.this.dbc.getActiveTableSchema(), DBMultiplierReader.this.costosysConfig, DBMultiplierReader.this.tableName, e.getMessage()});
                throw new IllegalArgumentException(e);
            }
        }

        public List<Object[]> getDocumentIds() {
            if (!DBMultiplierReader.this.fetchIdsProactively.booleanValue()) {
                log.debug("Fetching new documents (without employing a background thread).");
                this.run();
            }
            try {
                log.debug("Waiting for the background thread to finish fetching documents to return them.");
                this.join();
                return this.ids;
            }
            catch (InterruptedException e) {
                log.error("Background ID fetching thread was interrupted", (Throwable)e);
                return null;
            }
        }
    }
}

