/*
 * Decompiled with CFR 0.152.
 */
package de.julielab.jcore.reader.db;

import de.julielab.costosys.cli.TableNotFoundException;
import de.julielab.costosys.dbconnection.CoStoSysConnection;
import de.julielab.costosys.dbconnection.DBCIterator;
import de.julielab.costosys.dbconnection.DataBaseConnector;
import de.julielab.costosys.dbconnection.util.TableSchemaMismatchException;
import de.julielab.jcore.reader.db.DBSubsetReader;
import de.julielab.jcore.types.ext.DBProcessingMetaData;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.uima.UimaContext;
import org.apache.uima.collection.CollectionException;
import org.apache.uima.fit.descriptor.ConfigurationParameter;
import org.apache.uima.fit.descriptor.ResourceMetaData;
import org.apache.uima.fit.util.JCasUtil;
import org.apache.uima.jcas.JCas;
import org.apache.uima.jcas.cas.StringArray;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.util.Progress;
import org.apache.uima.util.ProgressImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ResourceMetaData(name="JCoRe Database Reader", description="A collection reader that fetches documents from a PostgreSQL database. It is an abstract class and must be extended to actually populate CAS instances. It works with the corpus storage system (CoStoSys), thus subset tables may be specified to read from in the 'Table' parameter. Then, the reader will mark batches of IDs read from the subset as being 'in process', allowing multiple DBReaders in different pipelines to be synchronized and not read documents multiple times. Additional tables can be specified that will be joined to the main document database. This is used to load annotations that have been stored in separate tables. The jcore-xmi-db-writer is able to write such annotation tables and the jcore-xmi-db-reader implements the assembly of such distributed annotation data. All mentioned components are part of the Jena Document Information System, JeDIS, for document annotation and management.", vendor="JULIE Lab Jena, Germany", copyright="JULIE Lab Jena, Germany")
public abstract class DBReader
extends DBSubsetReader {
    private static final Logger log = LoggerFactory.getLogger(DBReader.class);
    @ConfigurationParameter(name="Timestamp", mandatory=false, description="PostgreSQL timestamp expression that is evaluated against the data table. The data table schema, which must be the active data table schema in the CoStoSys configuration as always, must specify a single timestamp field for this parameter to work. Only data rows with a timestamp value larger than the given timestamp expression will be processed. Note that when reading from a subset table, there may be subset rows indicated to be in process which are finally not read from the data table. This is an implementational shortcoming and might be addressed if respective feature requests are given through the JULIE Lab GitHub page or JCoRe issues.")
    protected String dataTimestamp;
    private RetrievingThread retriever;
    private DBCIterator<byte[][]> xmlBytes;

    public static String setDBProcessingMetaData(DataBaseConnector dbc, boolean readDataTable, String tableName, byte[][] data, JCas cas) {
        JCasUtil.select(cas, DBProcessingMetaData.class).forEach(x -> x.removeFromIndexes());
        DBProcessingMetaData dbMetaData = new DBProcessingMetaData(cas);
        List<Integer> pkIndices = dbc.getPrimaryKeyIndices();
        StringArray pkArray = new StringArray(cas, pkIndices.size());
        StringBuilder pkBuilder = new StringBuilder();
        for (int i = 0; i < pkIndices.size(); ++i) {
            Integer index = pkIndices.get(i);
            String pkElementValue = new String(data[index], Charset.forName("UTF-8"));
            pkArray.set(i, pkElementValue);
            pkBuilder.append(pkElementValue);
            if (i >= pkIndices.size() - 1) continue;
            pkBuilder.append(",");
        }
        if (log.isDebugEnabled()) {
            log.trace("Setting primary key for DBProcessingMetaData to {}", (Object)Arrays.toString(pkArray.toArray()));
        }
        dbMetaData.setPrimaryKey(pkArray);
        if (!readDataTable) {
            String name = tableName.contains(".") ? tableName : dbc.getActivePGSchema() + "." + tableName;
            log.trace("Setting subset table name for DBProcessingMetaData to {}", (Object)name);
            dbMetaData.setSubsetTable(name);
        } else {
            log.trace("Not setting the subset to DBProcessingMetaData because reading the data table is set to {}", (Object)readDataTable);
        }
        dbMetaData.addToIndexes();
        return pkBuilder.toString();
    }

    @Override
    public void initialize(UimaContext context) throws ResourceInitializationException {
        super.initialize(context);
        this.dataTimestamp = (String)this.getConfigParameterValue("Timestamp");
        if (this.readDataTable.booleanValue() && this.hasNext) {
            log.info("Querying data table {} with schema {} and where condition {}", this.tableName, this.dbc.getActiveTableSchema(), this.whereCondition);
            this.xmlBytes = this.dbc.queryDataTable(this.tableName, this.whereCondition, this.additionalTableNames, this.schemas);
        }
    }

    @Override
    public boolean hasNext() throws IOException, CollectionException {
        return this.hasNext;
    }

    public byte[][] getNextArtifactData() throws CollectionException {
        log.trace("Fetching next document from the current database batch");
        byte[][] next = this.readDataTable != false ? this.getNextFromDataTable() : this.getNextFromSubset();
        if (next != null) {
            ++this.processedDocuments;
        }
        return next;
    }

    private byte[][] getNextFromDataTable() {
        this.hasNext = false;
        byte[][] next = (byte[][])this.xmlBytes.next();
        if (this.processedDocuments < this.totalDocumentCount - 1) {
            this.hasNext = this.xmlBytes.hasNext();
            if (!this.hasNext) {
                this.close();
            }
        }
        return next;
    }

    private byte[][] getNextFromSubset() {
        log.trace("Reading in subset table mode.");
        byte[][] next = null;
        if (this.retriever == null) {
            log.trace("Creating new RetrievingThread for fetching the first document batch");
            this.retriever = new RetrievingThread();
            this.xmlBytes = this.retriever.getDocuments();
            if (this.fetchIdsProactively.booleanValue()) {
                log.trace("Creating background RetrievingThread to immediately fetch the next document batch");
                this.retriever = new RetrievingThread();
            }
        }
        if (this.xmlBytes.hasNext()) {
            log.debug("Returning next document.");
            next = (byte[][])this.xmlBytes.next();
        }
        if (!this.xmlBytes.hasNext()) {
            this.xmlBytes = this.retriever.getDocuments();
            if (!this.xmlBytes.hasNext()) {
                log.debug("No more documents, settings 'hasNext' to false.");
                this.hasNext = false;
                this.close();
            } else if (this.fetchIdsProactively.booleanValue()) {
                log.trace("Creating background RetrievingThread to immediately fetch the next document batch");
                this.retriever = new RetrievingThread();
            }
        }
        return next;
    }

    @Override
    public Progress[] getProgress() {
        return new Progress[]{new ProgressImpl(this.processedDocuments, this.totalDocumentCount, "entities", true)};
    }

    @Override
    public void close() {
        log.debug("Closing {}", (Object)this.getClass().getCanonicalName());
        if (this.xmlBytes != null) {
            this.xmlBytes.close();
        }
        this.dbc.close();
    }

    protected abstract String getReaderComponentName();

    protected class RetrievingThread
    extends Thread {
        private List<Object[]> ids;
        private DBCIterator<byte[][]> documents;

        public RetrievingThread() {
            if (DBReader.this.fetchIdsProactively.booleanValue()) {
                log.debug("Fetching new documents in a background thread.");
                this.setName("DBReader RetrievingThread");
                this.start();
            }
        }

        @Override
        public void run() {
            int limit = Math.min(DBReader.this.batchSize, DBReader.this.totalDocumentCount - DBReader.this.numberFetchedDocIDs);
            try {
                try (CoStoSysConnection ignored = DBReader.this.dbc.obtainOrReserveConnection();){
                    this.ids = DBReader.this.dbc.retrieveAndMark(DBReader.this.tableName, DBReader.this.getReaderComponentName(), DBReader.this.hostName, DBReader.this.pid, limit, DBReader.this.selectionOrder);
                }
                if (log.isTraceEnabled()) {
                    ArrayList<String> idStrings = new ArrayList<String>();
                    for (Object[] o : this.ids) {
                        ArrayList<String> pkElements = new ArrayList<String>();
                        for (int i = 0; i < o.length; ++i) {
                            Object object = o[i];
                            pkElements.add(String.valueOf(object));
                        }
                        idStrings.add(StringUtils.join(pkElements, "-"));
                    }
                    log.trace("Reserved the following document IDs for processing: " + idStrings);
                }
            }
            catch (TableSchemaMismatchException e) {
                log.error("Table schema mismatch: The active table schema {} specified in the CoStoSys configuration file {} does not match the columns in the subset table {}: {}", DBReader.this.dbc.getActiveTableSchema(), DBReader.this.costosysConfig, DBReader.this.tableName, e.getMessage());
                throw new IllegalArgumentException(e);
            }
            catch (TableNotFoundException e) {
                log.error("The subset table {} could not be found in the database", (Object)DBReader.this.tableName, (Object)e);
                throw new IllegalArgumentException(e);
            }
            DBReader.this.numberFetchedDocIDs += this.ids.size();
            log.debug("Retrieved {} document IDs to fetch from the database.", (Object)this.ids.size());
            if (this.ids.size() > 0) {
                log.debug("Fetching {} documents from the database.", (Object)this.ids.size());
                if (DBReader.this.dataTimestamp == null) {
                    if (!DBReader.this.joinTables) {
                        log.trace("Fetching data from the data table {} without additional tables.", (Object)DBReader.this.dataTable);
                        this.documents = DBReader.this.dbc.retrieveColumnsByTableSchema(this.ids, DBReader.this.dataTable);
                    } else {
                        log.trace("Fetching data by joining tables {}. The used table schemas are {}.", (Object)DBReader.this.tables, (Object)DBReader.this.schemas);
                        this.documents = DBReader.this.dbc.retrieveColumnsByTableSchema(this.ids, DBReader.this.tables, DBReader.this.schemas);
                    }
                } else {
                    log.trace("Fetching data from data table {} that is newer than timestamp {}", (Object)DBReader.this.dataTable, (Object)DBReader.this.dataTimestamp);
                    this.documents = DBReader.this.dbc.queryWithTime(this.ids, DBReader.this.dataTable, DBReader.this.dataTimestamp);
                }
            } else {
                log.debug("No unfetched documents left.");
                this.documents = new DBCIterator<byte[][]>(){

                    @Override
                    public boolean hasNext() {
                        return false;
                    }

                    @Override
                    public byte[][] next() {
                        return null;
                    }

                    @Override
                    public void remove() {
                    }

                    @Override
                    public void close() {
                    }
                };
            }
        }

        public DBCIterator<byte[][]> getDocuments() {
            if (!DBReader.this.fetchIdsProactively.booleanValue()) {
                log.debug("Fetching new documents (without employing a background thread).");
                this.run();
            }
            try {
                log.debug("Waiting for the background thread to finish fetching documents to return them.");
                this.join();
                return this.documents;
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                return null;
            }
        }
    }
}

