/*
 * Decompiled with CFR 0.152.
 */
package de.julielab.neo4j.plugins;

import de.julielab.neo4j.plugins.ConceptManager;
import de.julielab.neo4j.plugins.FacetManager;
import de.julielab.neo4j.plugins.auxiliaries.JulieNeo4jUtilities;
import de.julielab.neo4j.plugins.auxiliaries.NodeUtilities;
import de.julielab.neo4j.plugins.auxiliaries.PropertyUtilities;
import de.julielab.neo4j.plugins.auxiliaries.semedico.PredefinedTraversals;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
import javax.xml.bind.DatatypeConverter;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.graphdb.Direction;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Path;
import org.neo4j.graphdb.PropertyContainer;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.ResourceIterable;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.traversal.TraversalDescription;
import org.neo4j.graphdb.traversal.Traverser;
import org.neo4j.server.plugins.Description;
import org.neo4j.server.plugins.Name;
import org.neo4j.server.plugins.Parameter;
import org.neo4j.server.plugins.PluginTarget;
import org.neo4j.server.plugins.ServerPlugin;
import org.neo4j.server.plugins.Source;
import org.neo4j.server.rest.repr.RecursiveMappingRepresentation;
import org.neo4j.server.rest.repr.Representation;
import org.neo4j.shell.util.json.JSONArray;
import org.neo4j.shell.util.json.JSONException;

public class Export
extends ServerPlugin {
    private static final Logger log = Logger.getLogger(Export.class.getName());
    public static final String EXPORT_ENDPOINT = "/db/data/ext/" + Export.class.getSimpleName() + "/graphdb/";
    public static final String HYPERNYMS = "hypernyms";
    public static final String LINGPIPE_DICT = "lingpipe_dictionary";
    public static final String TERM_TO_FACET = "term_facet_map";
    public static final String TERM_ID_MAPPING = "term_id_mapping";
    public static final String ELEMENT_TO_AGGREGATE_ID_MAPPING = "element_aggregate_id_mapping";
    public static final String PARAM_ID_PROPERTY = "id_property";
    public static final String PARAM_LABELS = "labels";
    public static final String PARAM_LABEL = "label";
    public static final String PARAM_EXCLUSION_LABEL = "exclusion_label";
    @Deprecated
    public static final String PARAM_FURTHER_PROPERTIES = "further_properties";
    public static final int OUTPUTSTREAM_INIT_SIZE = 200000000;
    public static final int HYPERNYMS_CACHE_SIZE = 100000;

    @Name(value="term_id_mapping")
    @Description(value="Creates the ID mapping file data required for LuCas' replace filter. The returned data is a JSON array of bytes. Those bytes represent the GZIPed string data of id mapping data. That is, to read the actual file content, the JSON array is to be converted to a byte[] which then serves as input for a ByteArrayInputStream which in turn goes through a GZIPInputStream for decoding. The result is a stream from which the mapping data can be read.")
    @PluginTarget(value=GraphDatabaseService.class)
    public Representation exportIdMapping(@Source GraphDatabaseService graphDb, @Parameter(name="id_property") @Description(value="TODO") String idProperty, @Parameter(name="labels") @Description(value="TODO") String labelStrings) throws Exception {
        log.info("Exporting ID mapping data.");
        JSONArray labelsArray = null != labelStrings ? new JSONArray(labelStrings) : null;
        log.info("Creating mapping file content for property \"" + idProperty + "\" and facets " + labelsArray);
        ByteArrayOutputStream gzipBytes = this.createIdMapping(graphDb, idProperty, labelsArray);
        byte[] bytes = gzipBytes.toByteArray();
        log.info("Sending all " + bytes.length + " bytes of GZIPed ID mapping file data.");
        log.info("Done exporting ID mapping data.");
        return RecursiveMappingRepresentation.getObjectRepresentation((Object)bytes);
    }

    private ByteArrayOutputStream createIdMapping(GraphDatabaseService graphDb, String idProperty, JSONArray labelsArray) throws Exception {
        ByteArrayOutputStream baos = new ByteArrayOutputStream(200000000);
        try (GZIPOutputStream os = new GZIPOutputStream(baos);
             Transaction tx = graphDb.beginTx();){
            int numWritten = 0;
            for (int i = 0; i < labelsArray.length(); ++i) {
                String labelString = labelsArray.getString(i);
                Label label = Label.label((String)labelString);
                ResourceIterator terms = graphDb.findNodes(label);
                while (terms.hasNext()) {
                    Node term = (Node)terms.next();
                    String termId = (String)term.getProperty("id");
                    Object idObject = PropertyUtilities.getNonNullNodeProperty((PropertyContainer)term, (String)idProperty);
                    if (null == idObject) continue;
                    if (idObject.getClass().isArray()) {
                        Object[] idArray = JulieNeo4jUtilities.convertArray((Object)idObject);
                        for (int j = 0; j < idArray.length; ++j) {
                            Object id = idArray[j];
                            IOUtils.write((String)(id + "\t" + termId + "\n"), (OutputStream)os, (String)"UTF-8");
                            ++numWritten;
                        }
                        continue;
                    }
                    IOUtils.write((String)(idObject + "\t" + termId + "\n"), (OutputStream)os, (String)"UTF-8");
                    ++numWritten;
                }
            }
            log.info("Num written: " + numWritten);
            tx.success();
        }
        return baos;
    }

    @Name(value="hypernyms")
    @Description(value="Creates the hypernym file data required for LuCas' hypernym filter. The returned data is a JSON array of bytes. Those bytes represent the GZIPed string data of hypernym data. That is, to read the actual file content, the JSON array is to be converted to a byte[] which then serves as input for a ByteArrayInputStream which in turn goes through a GZIPInputStream for decoding. The result is a stream from which the hypernym data can be read.")
    @PluginTarget(value=GraphDatabaseService.class)
    public Representation exportHypernyms(@Source GraphDatabaseService graphDb, @Description(value="The facet labels indicating for which facets to create the hypernyms file") @Parameter(name="labels", optional=true) String facetLabelStrings, @Description(value="A label restricting hypernym generation to terms with this label.") @Parameter(name="label", optional=true) String termLabel) throws Exception {
        JSONArray labelsArray;
        JSONArray jSONArray = labelsArray = null != facetLabelStrings ? new JSONArray(facetLabelStrings) : null;
        if (null == labelsArray) {
            log.info("Exporting hypernyms dictionary data for all facets.");
        } else {
            log.info("Exporting hypernyms dictionary data for the facets with labels " + labelsArray.toString() + ".");
        }
        ByteArrayOutputStream hypernymsGzipBytes = this.writeHypernymList(graphDb, labelsArray, termLabel, 100000);
        byte[] bytes = hypernymsGzipBytes.toByteArray();
        log.info("Sending all " + bytes.length + " bytes of GZIPed hypernym file data.");
        log.info("Done exporting hypernym data.");
        return RecursiveMappingRepresentation.getObjectRepresentation((Object)bytes);
    }

    private ByteArrayOutputStream writeHypernymList(GraphDatabaseService graphDb, JSONArray labelsArray, String termLabelString, int cacheSize) throws IOException, JSONException {
        JSONArray labels = labelsArray;
        if (null == labels) {
            labels = new JSONArray();
            labels.put((Object)FacetManager.FacetLabel.FACET.name());
        }
        Label termLabel = null;
        if (!StringUtils.isBlank((CharSequence)termLabelString)) {
            termLabel = Label.label((String)termLabelString);
        }
        HashMap<Node, Set<String>> cache = new HashMap<Node, Set<String>>(cacheSize);
        ByteArrayOutputStream baos = new ByteArrayOutputStream(200000000);
        try (GZIPOutputStream os = new GZIPOutputStream(baos);
             Transaction tx = graphDb.beginTx();){
            ResourceIterable facets;
            Label label;
            String labelString;
            int i;
            ArrayList<RelationshipType> relationshipTypeList = new ArrayList<RelationshipType>();
            if (labels.length() > 1 || !labels.getString(0).equals(FacetManager.FacetLabel.FACET.name())) {
                for (i = 0; i < labels.length(); ++i) {
                    labelString = labels.getString(i);
                    label = Label.label((String)labelString);
                    facets = () -> graphDb.findNodes(label);
                    for (Node facet : facets) {
                        if (!facet.hasLabel((Label)FacetManager.FacetLabel.FACET)) {
                            throw new IllegalArgumentException("Label node " + facet + " with the label " + label + " is no facet since it does not have the " + (Object)((Object)FacetManager.FacetLabel.FACET) + " label.");
                        }
                        String facetId = (String)facet.getProperty("id");
                        RelationshipType reltype = RelationshipType.withName((String)((Object)((Object)ConceptManager.EdgeTypes.IS_BROADER_THAN) + "_" + facetId));
                        relationshipTypeList.add(reltype);
                    }
                }
            } else {
                relationshipTypeList.add(ConceptManager.EdgeTypes.IS_BROADER_THAN);
            }
            for (i = 0; i < labels.length(); ++i) {
                labelString = labels.getString(i);
                label = Label.label((String)labelString);
                log.info("Now creating hypernyms for facets with label " + label);
                facets = () -> graphDb.findNodes(label);
                HashSet<Node> visitedNodes = new HashSet<Node>();
                for (Node facet : facets) {
                    Iterable rels = facet.getRelationships(Direction.OUTGOING, new RelationshipType[]{ConceptManager.EdgeTypes.HAS_ROOT_CONCEPT});
                    for (Relationship rel : rels) {
                        Node rootTerm = rel.getEndNode();
                        if (null != termLabel && !rootTerm.hasLabel(termLabel)) continue;
                        this.writeHypernyms(rootTerm, visitedNodes, cache, os, relationshipTypeList.toArray(new RelationshipType[relationshipTypeList.size()]));
                    }
                }
            }
        }
        return baos;
    }

    public Set<String> load(Node n, Map<Node, Set<String>> cache, RelationshipType[] relationshipTypes) {
        Set<String> hypernyms = cache.get(n);
        if (null != hypernyms) {
            return hypernyms;
        }
        hypernyms = new HashSet<String>();
        cache.put(n, hypernyms);
        HashSet<Node> visitedNodes = new HashSet<Node>();
        visitedNodes.add(n);
        for (Relationship rel : n.getRelationships(Direction.INCOMING, relationshipTypes)) {
            Node directHypernym = rel.getStartNode();
            boolean isHollow = false;
            for (Label l : directHypernym.getLabels()) {
                if (!l.equals((Object)ConceptManager.ConceptLabel.HOLLOW)) continue;
                isHollow = true;
            }
            if (isHollow || visitedNodes.contains(directHypernym)) continue;
            String directHypernymId = ((String)directHypernym.getProperty("id")).intern();
            hypernyms.add(directHypernymId);
            hypernyms.addAll(this.load(directHypernym, cache, relationshipTypes));
        }
        visitedNodes.remove(n);
        return hypernyms;
    }

    private void writeHypernyms(Node n, Set<Node> visitedNodes, Map<Node, Set<String>> cache, GZIPOutputStream os, RelationshipType[] relationshipTypes) throws IOException {
        if (visitedNodes.contains(n)) {
            return;
        }
        this.load(n, cache, relationshipTypes);
        visitedNodes.add(n);
        boolean isHollow = false;
        for (Label l : n.getLabels()) {
            if (!l.equals((Object)ConceptManager.ConceptLabel.HOLLOW)) continue;
            isHollow = true;
        }
        if (isHollow) {
            return;
        }
        Set<String> hypernyms = cache.get(n);
        if (hypernyms.size() > 0) {
            IOUtils.write((String)(n.getProperty("id") + "\t" + StringUtils.join(hypernyms, (String)"|") + "\n"), (OutputStream)os, (String)"UTF-8");
        }
        for (Relationship rel : n.getRelationships(Direction.OUTGOING, new RelationshipType[]{ConceptManager.EdgeTypes.IS_BROADER_THAN})) {
            this.writeHypernyms(rel.getEndNode(), visitedNodes, cache, os, relationshipTypes);
        }
        if (visitedNodes.size() % 100000 == 0) {
            log.info("Finished " + visitedNodes.size() + ".");
        }
    }

    @Name(value="lingpipe_dictionary")
    @Description(value="Creates a dictionary of all synonyms and writing variants for terms with label label and without label exclusion_label in the database. The dictionary has two columns, the synonym/writing variant and the term's ID. This dictionary is used with the Lingpipe chunker to recognize database terms in user queries. The returned data is a Base64 encoded string. This string represents the GZIPed string data of the dictionary. That is, to read the actual file content, the Base64 encoded string so to be decoded into a byte[] which then serves as input for a ByteArrayInputStream which in turn goes through a GZIPInputStream for decoding. The result is a stream from which the dictionary data can be read.")
    @PluginTarget(value=GraphDatabaseService.class)
    public String exportLingpipeDictionary(@Source GraphDatabaseService graphDb, @Description(value="The label to select the terms for which to create the dictionary.") @Parameter(name="label", optional=true) String labelString, @Description(value="A JSON list of labels that exclude terms from used for the dictionary.") @Parameter(name="exclusion_label", optional=true) String exclusionLabelString, @Description(value="The node properties providing the lingpipe chunker category. May refer to array-valued property. In case multiple properties are given, their values will be concatenated by two pipes (||). NOTE that it is expected that array-valued properties are or of the same size. The concatenation will be done for the same index in all value-arrays, i.e. not all combinations are built. For aggregates that not have a requested properties, their elements will be used instead.") @Parameter(name="id_property", optional=true) String[] nodeCategories) throws IOException {
        Object string;
        ConceptManager.ConceptLabel label = StringUtils.isBlank((CharSequence)labelString) ? ConceptManager.ConceptLabel.CONCEPT : Label.label((String)labelString);
        ArrayList<String> propertiesToWrite = new ArrayList<String>();
        if (nodeCategories == null || nodeCategories.length == 0) {
            propertiesToWrite.add("id");
        } else {
            for (int i = 0; i < nodeCategories.length; ++i) {
                String property = nodeCategories[i];
                propertiesToWrite.add(property);
            }
        }
        Label[] exclusionLabels = null;
        if (!StringUtils.isBlank((CharSequence)exclusionLabelString)) {
            try {
                JSONArray exclusionLabelsJson = new JSONArray(exclusionLabelString);
                exclusionLabels = new Label[exclusionLabelsJson.length()];
                for (int i = 0; i < exclusionLabelsJson.length(); ++i) {
                    string = exclusionLabelsJson.getString(i);
                    exclusionLabels[i] = Label.label((String)string);
                }
            }
            catch (JSONException e) {
                Label exclusionLabel = Label.label((String)exclusionLabelString);
                exclusionLabels = new Label[]{exclusionLabel};
            }
        }
        log.info("Exporting lingpipe dictionary data for nodes with label \"" + label.name() + "\", mapping their names to their properties " + propertiesToWrite + ".");
        ByteArrayOutputStream baos = new ByteArrayOutputStream(200000000);
        GZIPOutputStream os = new GZIPOutputStream(baos);
        string = null;
        try (Transaction tx = graphDb.beginTx();){
            ResourceIterator terms = graphDb.findNodes((Label)label);
            int count = 0;
            while (terms.hasNext()) {
                Node term = (Node)terms.next();
                ++count;
                boolean termHasExclusionLabel = false;
                for (int i = 0; null != exclusionLabels && i < exclusionLabels.length; ++i) {
                    Label exclusionLabel = exclusionLabels[i];
                    if (!term.hasLabel(exclusionLabel)) continue;
                    termHasExclusionLabel = true;
                    break;
                }
                if (!termHasExclusionLabel && term.hasProperty("id") && term.hasProperty("preferredName")) {
                    String idProperty = (String)propertiesToWrite.get(0);
                    String[] value = NodeUtilities.getNodePropertyAsStringArrayValue((Node)term, (String)idProperty);
                    if (null == value && term.hasLabel((Label)ConceptManager.ConceptLabel.AGGREGATE)) {
                        value = ConceptManager.getPropertyValueOfElements(term, idProperty);
                    }
                    if (null == value) {
                        terms.close();
                        throw new IllegalArgumentException("A concept occurred that does not have a value for the property \"" + idProperty + "\": " + NodeUtilities.getNodePropertiesAsString((PropertyContainer)term));
                    }
                    int arraySize = value.length;
                    ArrayList<String> categoryStrings = new ArrayList<String>();
                    for (int i = 0; i < arraySize; ++i) {
                        StringBuilder sb = new StringBuilder();
                        for (int j = 0; j < propertiesToWrite.size(); ++j) {
                            String property = (String)propertiesToWrite.get(j);
                            value = NodeUtilities.getNodePropertyAsStringArrayValue((Node)term, (String)property);
                            if (null == value && term.hasLabel((Label)ConceptManager.ConceptLabel.AGGREGATE)) {
                                value = ConceptManager.getPropertyValueOfElements(term, idProperty);
                            }
                            if (null == value || value.length == 0) {
                                terms.close();
                                throw new IllegalArgumentException("The property \"" + property + "\" does not contain a value for node " + term + " (properties: " + PropertyUtilities.getNodePropertiesAsString((PropertyContainer)term) + ")");
                            }
                            if (value.length != arraySize) {
                                terms.close();
                                throw new IllegalArgumentException("The properties \"" + propertiesToWrite + "\" on term " + PropertyUtilities.getNodePropertiesAsString((PropertyContainer)term) + " do not have all the same number of value elements which is required for dictionary creation by this method.");
                            }
                            sb.append(value[i]);
                            if (j >= propertiesToWrite.size() - 1) continue;
                            sb.append("||");
                        }
                        categoryStrings.add(sb.toString());
                    }
                    for (String categoryString : categoryStrings) {
                        String preferredName = (String)term.getProperty("preferredName");
                        String[] synonyms = new String[]{};
                        if (term.hasProperty("synonyms")) {
                            synonyms = (String[])term.getProperty("synonyms");
                        }
                        this.writeNormalizedDictionaryEntry(preferredName, categoryString, os);
                        for (String synonString : synonyms) {
                            this.writeNormalizedDictionaryEntry(synonString, categoryString, os);
                        }
                        TraversalDescription acronymsTraversal = PredefinedTraversals.getAcronymsTraversal(graphDb);
                        Traverser traverse = acronymsTraversal.traverse(term);
                        for (Node acronymNode : traverse.nodes()) {
                            String acronym = (String)acronymNode.getProperty("name");
                            this.writeNormalizedDictionaryEntry(acronym, categoryString, os);
                        }
                    }
                }
                if (count % 100000 != 0) continue;
                log.info(count + " terms processed.");
            }
        }
        catch (Throwable throwable) {
            string = throwable;
            throw throwable;
        }
        finally {
            if (os != null) {
                if (string != null) {
                    try {
                        os.close();
                    }
                    catch (Throwable throwable) {
                        ((Throwable)string).addSuppressed(throwable);
                    }
                } else {
                    os.close();
                }
            }
        }
        log.info("Done exporting Lingpipe term dictionary.");
        byte[] bytes = baos.toByteArray();
        String encoded = DatatypeConverter.printBase64Binary((byte[])bytes);
        return encoded;
    }

    private void writeNormalizedDictionaryEntry(String name, String termId, OutputStream os) throws IOException {
        String normalizedName = StringUtils.normalizeSpace((String)name);
        if (normalizedName.length() > 2) {
            IOUtils.write((String)(normalizedName + "\t" + termId + "\n"), (OutputStream)os, (String)"UTF-8");
        }
    }

    @Name(value="term_facet_map")
    @Description(value="Creates a map <term id>=<facet id> for all terms in the database. For terms that belong to multiple facets, several lines are created. The returned data is a JSON array of bytes. Those bytes represent the GZIPed string data of the dictionary. That is, to read the actual file content, the JSON array is to be converted to a byte[] which then serves as input for a ByteArrayInputStream which in turn goes through a GZIPInputStream for decoding. The result is a stream from which the dictionary data can be read.")
    @PluginTarget(value=GraphDatabaseService.class)
    public static Representation exportTermFacetMapping(@Source GraphDatabaseService graphDb, @Description(value="The term label to create the ID map for. Defaults to TERM.") @Parameter(name="label", optional=true) String labelString) throws IOException {
        log.info("Exporting lingpipe dictionary data.");
        ConceptManager.ConceptLabel label = !StringUtils.isBlank((CharSequence)labelString) ? Label.label((String)labelString) : ConceptManager.ConceptLabel.CONCEPT;
        ByteArrayOutputStream baos = new ByteArrayOutputStream(200000000);
        try (GZIPOutputStream os = new GZIPOutputStream(baos);
             Transaction tx = graphDb.beginTx();){
            ResourceIterable terms = () -> graphDb.findNodes(label);
            int count = 0;
            for (Node term : terms) {
                ++count;
                if (term.hasProperty("id") && term.hasProperty("facets")) {
                    String termId = (String)term.getProperty("id");
                    Object[] facetIds = (String[])term.getProperty("facets");
                    IOUtils.write((String)(termId + "\t" + StringUtils.join((Object[])facetIds, (String)"|") + "\n"), (OutputStream)os, (String)"UTF-8");
                }
                if (count % 100000 != 0) continue;
                log.info(count + " terms processed.");
            }
            log.info("Done exporting mapping from term ID to corresponding facet IDs.");
        }
        return RecursiveMappingRepresentation.getObjectRepresentation((Object)baos.toByteArray());
    }

    @Name(value="element_aggregate_id_mapping")
    @Description(value="Creates a mapping file from aggregate element IDs to their respective aggregate ID. Currently, only non-aggregate elements are eligible.")
    @PluginTarget(value=GraphDatabaseService.class)
    public String exportElementToAggregateIdMapping(@Source GraphDatabaseService graphDb, @Parameter(name="labels") @Description(value="The aggregate labels for which to create the mapping") String aggLabelStrings) throws Exception {
        log.info("Exporting element-aggregate ID mapping data.");
        JSONArray labelsArray = new JSONArray(aggLabelStrings);
        log.info("Creating element-aggregate ID mapping file content for aggregate labels \"" + labelsArray + "\"");
        ByteArrayOutputStream gzipBytes = this.createElementAggregateIdMapping(graphDb, labelsArray);
        byte[] bytes = gzipBytes.toByteArray();
        log.info("Sending all " + bytes.length + " bytes of GZIPed ID element-aggregate ID mapping file data.");
        String encoded = DatatypeConverter.printBase64Binary((byte[])bytes);
        log.info("Done exporting element-aggregate ID mapping data.");
        return encoded;
    }

    private ByteArrayOutputStream createElementAggregateIdMapping(GraphDatabaseService graphDb, JSONArray aggLabelsArray) throws IOException, JSONException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream(200000000);
        try (GZIPOutputStream os = new GZIPOutputStream(baos);
             Transaction tx = graphDb.beginTx();){
            HashMap<String, String> ele2Agg = new HashMap<String, String>();
            HashSet<String> visitedAggregates = new HashSet<String>();
            for (int i = 0; i < aggLabelsArray.length(); ++i) {
                Label label = Label.label((String)aggLabelsArray.getString(i));
                ResourceIterator aggregates = graphDb.findNodes(label);
                TraversalDescription td = PredefinedTraversals.getNonAggregateAggregateElements(graphDb);
                while (aggregates.hasNext()) {
                    Node aggregate = (Node)aggregates.next();
                    String aggregateId = (String)aggregate.getProperty("id");
                    if (!visitedAggregates.add(aggregateId)) continue;
                    Traverser traverse = td.traverse(aggregate);
                    for (Path elementPath : traverse) {
                        for (Node n : elementPath.nodes()) {
                            if (!n.hasLabel((Label)ConceptManager.ConceptLabel.AGGREGATE)) continue;
                            visitedAggregates.add((String)n.getProperty("id"));
                        }
                        Node element = elementPath.endNode();
                        if (!element.hasProperty("id")) {
                            log.warning("Node " + element.getId() + " does not have the ID property " + "id" + " and is discarded for the creation of the element aggregate ID mapping.");
                            continue;
                        }
                        String elementId = (String)element.getProperty("id");
                        ele2Agg.put(elementId, aggregateId);
                    }
                }
            }
            int numWritten = 0;
            for (String elementId : ele2Agg.keySet()) {
                String aggregateId = (String)ele2Agg.get(elementId);
                IOUtils.write((String)(elementId + "\t" + aggregateId + "\n"), (OutputStream)os, (String)"UTF-8");
                ++numWritten;
            }
            log.info("Num written: " + numWritten);
            tx.success();
        }
        return baos;
    }
}

