/*
 * Decompiled with CFR 0.152.
 */
package de.julielab.jcore.ae.checkpoint;

import com.google.common.collect.Sets;
import de.julielab.costosys.configuration.FieldConfig;
import de.julielab.costosys.dbconnection.CoStoSysConnection;
import de.julielab.costosys.dbconnection.DataBaseConnector;
import de.julielab.jcore.ae.checkpoint.DocumentId;
import de.julielab.jcore.ae.checkpoint.DocumentReleaseCheckpoint;
import de.julielab.jcore.types.ext.DBProcessingMetaData;
import de.julielab.jcore.utility.JCoReTools;
import java.io.FileNotFoundException;
import java.sql.BatchUpdateException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.uima.UimaContext;
import org.apache.uima.analysis_component.JCasAnnotator_ImplBase;
import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
import org.apache.uima.fit.descriptor.ConfigurationParameter;
import org.apache.uima.fit.descriptor.ResourceMetaData;
import org.apache.uima.fit.util.JCasUtil;
import org.apache.uima.jcas.JCas;
import org.apache.uima.resource.ResourceInitializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ResourceMetaData(name="JCoRe Database Checkpoint AE", description="This component can be used when using a JCoRe database reader that reads from a CoStoSys/JeDIS subset. Enters the configured component name in the 'last component' column. Can also mark documents as being completely processed.")
public class DBCheckpointAE
extends JCasAnnotator_ImplBase {
    public static final String PARAM_CHECKPOINT_NAME = "CheckpointName";
    public static final String PARAM_INDICATE_FINISHED = "IndicateFinished";
    public static final String PARAM_COSTOSYS_CONFIG = "CostosysConfigFile";
    public static final String PARAM_WRITE_BATCH_SIZE = "WriteBatchSize";
    private static final Logger log = LoggerFactory.getLogger(DBCheckpointAE.class);
    private DataBaseConnector dbc;
    @ConfigurationParameter(name="CheckpointName", description="String parameter. A name that identifies this checkpoint in the database.")
    private String componentDbName;
    @ConfigurationParameter(name="IndicateFinished", mandatory=false, description="Whether or not the checkpoint should mark the end of processing of the pipeline. If set to true, this component will not only set its name as checkpoint in the subset table but also set the 'is processed' flag to true and the 'is in process' flag to false.")
    private boolean indicateFinished;
    @ConfigurationParameter(name="CostosysConfigFile", description="File path or classpath resource location of a Corpus Storage System (CoStoSys) configuration file. This file specifies the database to write the XMI data into and the data table schema. This schema must at least define the primary key columns that the storage tables should have for each document. The primary key is currently just the document ID. Thus, at the moment, primary keys can only consist of a single element when using this component. This is a shortcoming of this specific component and must be changed here, if necessary.")
    private String dbcConfigPath;
    @ConfigurationParameter(name="WriteBatchSize", mandatory=false, defaultValue={"50"}, description="The number of processed CASes after which the checkpoint should be written into the database. Defaults to 50.")
    private int writeBatchSize;
    @ConfigurationParameter(name="JedisSynchronizationKey", mandatory=false, description="If set, the value of this parameter is used to synchronize the 'processed' mark in the subset table documents processed by the pipeline. This is useful when document data is sent batchwise to the database by multiple components: In the case of a crash or manual cancellation of a pipeline run without synchronization is might happen that some components have sent their data and others haven't at the time of termination. To avoid an inconsistent database state,a document will only be marked as finished processed in the JeDIS subset table if all synchronized components in the pipeline have released the document. This is done by the DBCheckpointAE which must be at the end of the pipeline and have the 'IndicateFinished' parameter set to 'true'. Synchronized components are those that disclose this parameter and have a value set to it.")
    private String jedisSyncKey;
    private String subsetTable;
    private Set<DocumentId> docIds;
    private DocumentReleaseCheckpoint docReleaseCheckpoint;

    public void initialize(UimaContext aContext) throws ResourceInitializationException {
        super.initialize(aContext);
        this.componentDbName = (String)aContext.getConfigParameterValue(PARAM_CHECKPOINT_NAME);
        this.dbcConfigPath = (String)aContext.getConfigParameterValue(PARAM_COSTOSYS_CONFIG);
        this.indicateFinished = Optional.ofNullable((Boolean)aContext.getConfigParameterValue(PARAM_INDICATE_FINISHED)).orElse(false);
        this.writeBatchSize = Optional.ofNullable((Integer)aContext.getConfigParameterValue(PARAM_WRITE_BATCH_SIZE)).orElse(50);
        try {
            this.dbc = new DataBaseConnector(this.dbcConfigPath);
        }
        catch (FileNotFoundException e) {
            log.error("Could not initiate database connector", (Throwable)e);
            throw new ResourceInitializationException((Throwable)e);
        }
        this.docIds = new HashSet<DocumentId>();
        if (this.indicateFinished) {
            this.jedisSyncKey = (String)Optional.ofNullable(aContext.getConfigParameterValue("JedisSynchronizationKey")).orElse(((Object)((Object)this)).getClass().getCanonicalName() + this.componentDbName);
            this.docReleaseCheckpoint = DocumentReleaseCheckpoint.get();
            this.docReleaseCheckpoint.register(this.jedisSyncKey);
        }
        log.info("{}: {}", (Object)PARAM_CHECKPOINT_NAME, (Object)this.componentDbName);
        log.info("{}: {}", (Object)PARAM_INDICATE_FINISHED, (Object)this.indicateFinished);
        log.info("{}: {}", (Object)PARAM_CHECKPOINT_NAME, (Object)this.componentDbName);
        log.info("{}: {}", (Object)PARAM_WRITE_BATCH_SIZE, (Object)this.writeBatchSize);
    }

    public void batchProcessComplete() throws AnalysisEngineProcessException {
        super.batchProcessComplete();
        log.debug("BatchProcessComplete called, stashing {} documents to be ready for marked as being finished", (Object)this.docIds.size());
        if (this.indicateFinished) {
            this.docReleaseCheckpoint.release(this.jedisSyncKey, this.docIds.stream());
        }
        try (CoStoSysConnection conn = this.dbc.obtainOrReserveConnection();){
            this.setLastComponent(conn, this.subsetTable, this.indicateFinished, this.dbc.getActiveTableFieldConfiguration());
        }
        this.docIds.clear();
    }

    public void collectionProcessComplete() throws AnalysisEngineProcessException {
        super.collectionProcessComplete();
        log.debug("CollectionProcessComplete called, stashing {} documents to be ready for marked as being finished", (Object)this.docIds.size());
        if (this.indicateFinished) {
            this.docReleaseCheckpoint.release(this.jedisSyncKey, this.docIds.stream());
        }
        try (CoStoSysConnection conn = this.dbc.obtainOrReserveConnection();){
            this.setLastComponent(conn, this.subsetTable, this.indicateFinished, this.dbc.getActiveTableFieldConfiguration());
        }
        this.docIds.clear();
        log.info("Closing database connector.");
        this.dbc.close();
    }

    private void customBatchProcessingComplete() throws AnalysisEngineProcessException {
        log.debug("CustomBatchProcessComplete called, stashing {} documents to be ready for marked as being finished", (Object)this.docIds.size());
        if (this.indicateFinished) {
            this.docReleaseCheckpoint.release(this.jedisSyncKey, this.docIds.stream());
        }
        try (CoStoSysConnection conn = this.dbc.obtainOrReserveConnection();){
            this.setLastComponent(conn, this.subsetTable, this.indicateFinished, this.dbc.getActiveTableFieldConfiguration());
        }
        this.docIds.clear();
    }

    public void process(JCas aJCas) throws AnalysisEngineProcessException {
        log.trace("Processing jCas instance " + aJCas);
        try {
            DBProcessingMetaData dbProcessingMetaData = (DBProcessingMetaData)JCasUtil.selectSingle((JCas)aJCas, DBProcessingMetaData.class);
            if (!dbProcessingMetaData.getDoNotMarkAsProcessed()) {
                DocumentId documentId = new DocumentId(dbProcessingMetaData);
                if (this.subsetTable == null) {
                    this.subsetTable = dbProcessingMetaData.getSubsetTable();
                }
                if (this.subsetTable == null) {
                    if (dbProcessingMetaData.getSubsetTable() == null) {
                        log.error("The subset table retrieved from the DBProcessingMetaData is null. Cannot continue without the table name.");
                        throw new AnalysisEngineProcessException((Throwable)new IllegalStateException("The subset table retrieved from the DBProcessingMetaData is null. Cannot continue without the table name."));
                    }
                    this.subsetTable = dbProcessingMetaData.getSubsetTable();
                }
                this.docIds.add(documentId);
                log.trace("Adding document ID {} for subset table {} for checkpoint marking", (Object)documentId, (Object)this.subsetTable);
                if (this.docIds.size() >= this.writeBatchSize) {
                    log.debug("Cached documents have reached the configured batch size of {}, sending to database.", (Object)this.writeBatchSize);
                    this.customBatchProcessingComplete();
                }
            }
        }
        catch (IllegalArgumentException e) {
            String docId = JCoReTools.getDocId((JCas)aJCas);
            log.error("The document with document ID {} does not have an annotation of type {}. This annotation ought to contain the name of the subset table. It should be set by the DB reader. Cannot write the checkpoint to the database since the target subset table or its schema is unknown.", (Object)docId, (Object)DBProcessingMetaData.class.getCanonicalName());
            throw new AnalysisEngineProcessException((Throwable)e);
        }
    }

    private void setLastComponent(CoStoSysConnection conn, String subsetTableName, boolean markIsProcessed, FieldConfig annotationFieldConfig) throws AnalysisEngineProcessException {
        Sets.SetView documentIdsToSetLastComponent;
        Set<Object> processedDocumentIds = Collections.emptySet();
        if (markIsProcessed) {
            processedDocumentIds = this.docReleaseCheckpoint.getReleasedDocumentIds();
        }
        if ((documentIdsToSetLastComponent = Sets.difference(this.docIds, processedDocumentIds)).isEmpty() && processedDocumentIds.isEmpty() || StringUtils.isBlank((String)subsetTableName)) {
            log.debug("Not setting the last component to {} because the processed document IDs list is empty (size: {}) or the subset table name could not be retrieved (is: {})", new Object[]{this.componentDbName, documentIdsToSetLastComponent.size(), subsetTableName});
            return;
        }
        String[] primaryKey = annotationFieldConfig.getPrimaryKey();
        if (primaryKey.length > 1) {
            throw new IllegalArgumentException("Currently, only one-element primary keys are supported.");
        }
        String primaryKeyPsString = StringUtils.join((Object[])annotationFieldConfig.expandPKNames("%s = ?"), (String)" AND ");
        String sqlSetLastComponent = String.format("UPDATE %s SET %s='%s' WHERE %s", subsetTableName, "last_component", this.componentDbName, primaryKeyPsString);
        String sqlMarkIsProcessed = null;
        if (markIsProcessed) {
            sqlMarkIsProcessed = String.format("UPDATE %s SET %s='%s', %s=TRUE, %s=FALSE WHERE %s", subsetTableName, "last_component", this.componentDbName, "is_processed", "is_in_process", primaryKeyPsString);
        }
        if (!documentIdsToSetLastComponent.isEmpty()) {
            log.debug("Setting the last component to '{}' for {} documents", (Object)this.componentDbName, (Object)documentIdsToSetLastComponent.size());
            this.updateSubsetTable(conn, (Collection<DocumentId>)documentIdsToSetLastComponent, sqlSetLastComponent);
        }
        if (markIsProcessed) {
            log.debug("Marking {} documents to having been processed by component \"{}\".", (Object)processedDocumentIds.size(), (Object)this.componentDbName);
            log.debug("SQL: {}", sqlMarkIsProcessed);
            log.trace("Marking the following document IDS as having been processed: {}", processedDocumentIds);
            this.updateSubsetTable(conn, processedDocumentIds, sqlMarkIsProcessed);
        }
        try {
            log.debug("Connection is auto commit: {}", (Object)conn.getAutoCommit());
            if (!conn.getAutoCommit()) {
                log.debug("Committing changes");
                conn.commit();
            }
        }
        catch (SQLException e) {
            log.error("Could not commit the document processing status changes.", (Throwable)e);
            throw new AnalysisEngineProcessException((Throwable)e);
        }
    }

    private void updateSubsetTable(CoStoSysConnection conn, Collection<DocumentId> documentIdsToMark, String sql) throws AnalysisEngineProcessException {
        try {
            boolean tryagain;
            do {
                tryagain = false;
                PreparedStatement ps = conn.prepareStatement(sql);
                for (DocumentId docId : documentIdsToMark) {
                    for (int i = 0; i < docId.getId().length; ++i) {
                        String pkElement = docId.getId()[i];
                        ps.setString(i + 1, pkElement);
                    }
                    ps.addBatch();
                }
                try {
                    log.debug("Executing SQL command batch for being processed.");
                    ps.executeBatch();
                }
                catch (BatchUpdateException e) {
                    if (!e.getMessage().contains("deadlock detected")) continue;
                    log.debug("Database transaction deadlock detected while trying to set the last component. Trying again.");
                    tryagain = true;
                }
            } while (tryagain);
        }
        catch (SQLException e) {
            e.printStackTrace();
            SQLException nextException = e.getNextException();
            if (null == nextException) {
                throw new AnalysisEngineProcessException((Throwable)e);
            }
            nextException.printStackTrace();
            throw new AnalysisEngineProcessException((Throwable)nextException);
        }
    }
}

