package org.apache.streams.elasticsearch.processor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.elasticsearch.ElasticsearchClientManager;
import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.extensions.ExtensionUtil;
import org.apache.streams.pojo.json.Activity;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.percolate.PercolateRequestBuilder;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/elasticsearch/processor/PercolateTagProcessor.class */
public class PercolateTagProcessor implements StreamsProcessor {
    public static final String STREAMS_ID = "PercolateTagProcessor";
    private static final Logger LOGGER = LoggerFactory.getLogger(PercolateTagProcessor.class);
    private static final String DEFAULT_PERCOLATE_FIELD = "_all";
    private ObjectMapper mapper;
    protected Queue<StreamsDatum> inQueue;
    protected Queue<StreamsDatum> outQueue;
    public static final String TAGS_EXTENSION = "tags";
    private ElasticsearchWriterConfiguration config;
    private ElasticsearchClientManager manager;
    private BulkRequestBuilder bulkBuilder;
    protected String usePercolateField;

    /* loaded from: input_file:org/apache/streams/elasticsearch/processor/PercolateTagProcessor$FilterLevel.class */
    public enum FilterLevel {
        MUST,
        SHOULD,
        MUST_NOT
    }

    /* loaded from: input_file:org/apache/streams/elasticsearch/processor/PercolateTagProcessor$PercolateQueryBuilder.class */
    public static class PercolateQueryBuilder {
        private QueryStringQueryBuilder queryBuilder;
        private String id;

        public PercolateQueryBuilder(String str, String str2, String str3) {
            this.id = str;
            this.queryBuilder = new QueryStringQueryBuilder(str2);
            this.queryBuilder.defaultField(str3);
        }

        public String getId() {
            return this.id;
        }

        public String getSource() {
            return "{ \n\"query\" : " + this.queryBuilder.toString() + "\n}";
        }
    }

    public PercolateTagProcessor(ElasticsearchWriterConfiguration elasticsearchWriterConfiguration) {
        this(elasticsearchWriterConfiguration, DEFAULT_PERCOLATE_FIELD);
    }

    public PercolateTagProcessor(ElasticsearchWriterConfiguration elasticsearchWriterConfiguration, String str) {
        this.config = elasticsearchWriterConfiguration;
        this.usePercolateField = str;
    }

    public ElasticsearchClientManager getManager() {
        return this.manager;
    }

    public void setManager(ElasticsearchClientManager elasticsearchClientManager) {
        this.manager = elasticsearchClientManager;
    }

    public ElasticsearchConfiguration getConfig() {
        return this.config;
    }

    public void setConfig(ElasticsearchWriterConfiguration elasticsearchWriterConfiguration) {
        this.config = elasticsearchWriterConfiguration;
    }

    public Queue<StreamsDatum> getProcessorOutputQueue() {
        return this.outQueue;
    }

    public String getId() {
        return STREAMS_ID;
    }

    public List<StreamsDatum> process(StreamsDatum streamsDatum) {
        ObjectNode objectNode;
        String writeValueAsString;
        ArrayList arrayList = new ArrayList();
        if (streamsDatum.getDocument() instanceof String) {
            writeValueAsString = (String) streamsDatum.getDocument();
            try {
                objectNode = (ObjectNode) this.mapper.readTree(writeValueAsString);
            } catch (IOException e) {
                e.printStackTrace();
                return null;
            }
        } else {
            if (!(streamsDatum.getDocument() instanceof ObjectNode)) {
                LOGGER.warn("Incompatible document type: ", streamsDatum.getDocument().getClass());
                return null;
            }
            objectNode = (ObjectNode) streamsDatum.getDocument();
            try {
                writeValueAsString = this.mapper.writeValueAsString(objectNode);
            } catch (JsonProcessingException e2) {
                LOGGER.warn("Invalid datum: ", objectNode);
                return null;
            }
        }
        StringBuilder sb = new StringBuilder();
        sb.append("{ \"doc\": ");
        sb.append(writeValueAsString);
        sb.append("}");
        try {
            LOGGER.trace("Percolate request json: {}", sb.toString());
            PercolateRequestBuilder source = this.manager.client().preparePercolate().setIndices(new String[]{this.config.getIndex()}).setDocumentType(this.config.getType()).setSource(sb.toString());
            LOGGER.trace("Percolate request: {}", this.mapper.writeValueAsString(source.request()));
            PercolateResponse percolateResponse = (PercolateResponse) source.execute().actionGet();
            LOGGER.trace("Percolate response: {} matches", Integer.valueOf(percolateResponse.getMatches().length));
            ArrayNode arrayNode = JsonNodeFactory.instance.arrayNode();
            Iterator it = percolateResponse.iterator();
            while (it.hasNext()) {
                arrayNode.add(((PercolateResponse.Match) it.next()).getId().string());
            }
            LOGGER.trace("Percolate matches: {}", arrayNode);
            Activity activity = (Activity) this.mapper.convertValue(objectNode, Activity.class);
            appendMatches(arrayNode, activity);
            streamsDatum.setDocument(activity);
            arrayList.add(streamsDatum);
            return arrayList;
        } catch (Exception e3) {
            LOGGER.warn("Percolate exception: {}", e3.getMessage());
            return null;
        }
    }

    protected void appendMatches(ArrayNode arrayNode, Activity activity) {
        ExtensionUtil.getInstance().addExtension(activity, TAGS_EXTENSION, arrayNode);
    }

    public void prepare(Object obj) {
        this.mapper = StreamsJacksonMapper.getInstance();
        Objects.requireNonNull(this.config);
        this.manager = ElasticsearchClientManager.getInstance(this.config);
        if (this.config.getTags() == null || this.config.getTags().getAdditionalProperties().size() <= 0) {
            return;
        }
        createIndexIfMissing(this.config.getIndex());
        if (this.config.getReplaceTags().booleanValue()) {
            deleteOldQueries(this.config.getIndex());
        }
        for (String str : this.config.getTags().getAdditionalProperties().keySet()) {
            addPercolateRule(new PercolateQueryBuilder(str, (String) this.config.getTags().getAdditionalProperties().get(str), this.usePercolateField), this.config.getIndex());
        }
        this.bulkBuilder = this.manager.client().prepareBulk();
        if (writePercolateRules()) {
            LOGGER.info("wrote " + this.bulkBuilder.numberOfActions() + " tags to " + this.config.getIndex() + " _percolator");
        } else {
            LOGGER.error("FAILED writing " + this.bulkBuilder.numberOfActions() + " tags to " + this.config.getIndex() + " _percolator");
        }
    }

    public void cleanUp() {
        if (this.config.getCleanupTags().booleanValue()) {
            deleteOldQueries(this.config.getIndex());
        }
        this.manager.client().close();
    }

    public int numOfPercolateRules() {
        return this.bulkBuilder.numberOfActions();
    }

    public void createIndexIfMissing(String str) {
        if (((IndicesExistsResponse) this.manager.client().admin().indices().exists(new IndicesExistsRequest(new String[]{str})).actionGet()).isExists()) {
            return;
        }
        CreateIndexResponse createIndexResponse = (CreateIndexResponse) this.manager.client().admin().indices().create(new CreateIndexRequest(str)).actionGet();
        if (createIndexResponse.isAcknowledged()) {
            LOGGER.info("Index {} did not exist. The index was automatically created from the stored ElasticSearch Templates.", str);
        } else {
            LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", str);
            LOGGER.error("Error Message: {}", createIndexResponse.toString());
            throw new RuntimeException("Unable to create index " + str);
        }
    }

    public void addPercolateRule(PercolateQueryBuilder percolateQueryBuilder, String str) {
        this.bulkBuilder.add(this.manager.client().prepareIndex(str, ".percolator", percolateQueryBuilder.getId()).setSource(percolateQueryBuilder.getSource()));
    }

    public boolean writePercolateRules() {
        if (numOfPercolateRules() < 0) {
            throw new RuntimeException("No Rules Have been added!");
        }
        BulkResponse bulkResponse = (BulkResponse) this.bulkBuilder.execute().actionGet();
        for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
            if (bulkItemResponse.isFailed()) {
                LOGGER.error(bulkItemResponse.getId() + "\t" + bulkItemResponse.getFailureMessage());
            }
        }
        return !bulkResponse.hasFailures();
    }

    public boolean removeOldTags(Set<String> set, String str) {
        if (set.size() == 0) {
            return false;
        }
        BulkRequestBuilder prepareBulk = this.manager.client().prepareBulk();
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            prepareBulk.add(this.manager.client().prepareDelete("_percolator", str, it.next()));
        }
        return !((BulkResponse) prepareBulk.execute().actionGet()).hasFailures();
    }

    public Set<String> getActivePercolateTags(String str) {
        HashSet hashSet = new HashSet();
        for (SearchHit searchHit : ((SearchResponse) this.manager.client().prepareSearch(new String[]{"*"}).setIndices(new String[]{str}).setTypes(new String[]{".percolator"}).setSize(1000).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet()).getHits().getHits()) {
            hashSet.add(searchHit.id());
        }
        return hashSet;
    }

    public boolean deleteOldQueries(String str) {
        Set<String> activePercolateTags = getActivePercolateTags(str);
        if (activePercolateTags.size() == 0) {
            LOGGER.warn("No active tags were found in _percolator for index : {}", str);
            return false;
        }
        LOGGER.info("Deleting {} tags.", Integer.valueOf(activePercolateTags.size()));
        BulkRequestBuilder prepareBulk = this.manager.client().prepareBulk();
        Iterator<String> it = activePercolateTags.iterator();
        while (it.hasNext()) {
            prepareBulk.add(this.manager.client().prepareDelete().setType(".percolator").setIndex(str).setId(it.next()));
        }
        return !((BulkResponse) prepareBulk.execute().actionGet()).hasFailures();
    }
}
