/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.ignite.cache;

import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.ignite.cache.IgniteCacheEndpoint;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IgniteCacheContinuousQueryConsumer
extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(IgniteCacheContinuousQueryConsumer.class);
    private IgniteCacheEndpoint endpoint;
    private IgniteCache<Object, Object> cache;
    private QueryCursor<Cache.Entry<Object, Object>> cursor;

    public IgniteCacheContinuousQueryConsumer(IgniteCacheEndpoint endpoint, Processor processor, IgniteCache<Object, Object> cache) {
        super((Endpoint)endpoint, processor);
        this.endpoint = endpoint;
        this.cache = cache;
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.launchContinuousQuery();
        LOG.info("Started Ignite Cache Continuous Query consumer for cache {} with query:\u00a0{}.", (Object)this.cache.getName(), this.endpoint.getQuery());
        this.maybeFireExistingQueryResults();
    }

    private void maybeFireExistingQueryResults() {
        if (!this.endpoint.isFireExistingQueryResults()) {
            LOG.info(String.format("Skipping existing cache results for cache name = %s.", this.endpoint.getCacheName()));
            return;
        }
        LOG.info(String.format("Processing existing cache results for cache name = %s.", this.endpoint.getCacheName()));
        for (Cache.Entry entry : this.cursor) {
            Exchange exchange = this.createExchange(entry.getValue());
            exchange.getIn().setHeader("CamelIgniteCacheKey", entry.getKey());
            this.getAsyncProcessor().process(this.createExchange(entry), new AsyncCallback(){

                public void done(boolean doneSync) {
                }
            });
        }
    }

    private void launchContinuousQuery() {
        ContinuousQuery continuousQuery = new ContinuousQuery();
        if (this.endpoint.getQuery() != null) {
            continuousQuery.setInitialQuery(this.endpoint.getQuery());
        }
        if (this.endpoint.getRemoteFilter() != null) {
            continuousQuery.setRemoteFilter(this.endpoint.getRemoteFilter());
        }
        continuousQuery.setLocalListener((CacheEntryUpdatedListener)new CacheEntryUpdatedListener<Object, Object>(){

            public void onUpdated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> events) throws CacheEntryListenerException {
                if (LOG.isTraceEnabled()) {
                    LOG.info("Processing Continuous Query event(s):\u00a0{}.", events);
                }
                if (!IgniteCacheContinuousQueryConsumer.this.endpoint.isOneExchangePerUpdate()) {
                    IgniteCacheContinuousQueryConsumer.this.fireGroupedExchange(events);
                    return;
                }
                for (CacheEntryEvent<? extends Object, ? extends Object> entry : events) {
                    IgniteCacheContinuousQueryConsumer.this.fireSingleExchange((CacheEntryEvent<? extends Object, ? extends Object>)entry);
                }
            }
        });
        continuousQuery.setAutoUnsubscribe(this.endpoint.isAutoUnsubscribe());
        continuousQuery.setPageSize(this.endpoint.getPageSize());
        continuousQuery.setTimeInterval(this.endpoint.getTimeInterval());
        this.cursor = this.cache.query((Query)continuousQuery);
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.cursor.close();
        LOG.info("Stopped Ignite Cache Continuous Query consumer for cache {} with query:\u00a0{}.", (Object)this.cache.getName(), this.endpoint.getQuery());
    }

    private void fireSingleExchange(CacheEntryEvent<? extends Object, ? extends Object> entry) {
        Exchange exchange = this.createExchange(entry.getValue());
        exchange.getIn().setHeader("CamelIgniteCacheEventType", (Object)entry.getEventType());
        exchange.getIn().setHeader("CamelIgniteCacheOldValue", entry.getOldValue());
        exchange.getIn().setHeader("CamelIgniteCacheKey", entry.getKey());
        this.getAsyncProcessor().process(exchange, new AsyncCallback(){

            public void done(boolean doneSync) {
            }
        });
    }

    private void fireGroupedExchange(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> events) {
        Exchange exchange = this.createExchange(events);
        this.getAsyncProcessor().process(exchange, new AsyncCallback(){

            public void done(boolean doneSync) {
            }
        });
    }

    private Exchange createExchange(Object payload) {
        Exchange exchange = this.endpoint.createExchange(ExchangePattern.InOnly);
        Message in = exchange.getIn();
        in.setBody(payload);
        in.setHeader("CamelIgniteCacheName", (Object)this.endpoint.getCacheName());
        return exchange;
    }
}

