package org.elasticsearch.plugin.river.redis;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/elasticsearch/plugin/river/redis/RedisIndexer.class */
public class RedisIndexer implements Runnable {
    private static final String[] POISON = {"jekkyl", "hyde"};
    private static Logger logger = LoggerFactory.getLogger(RedisIndexer.class);
    private final Client client;
    private final String index;
    private final boolean json;
    private final String messageField;
    private BlockingQueue<String[]> queue = new LinkedBlockingQueue();

    public RedisIndexer(Client client, String str, boolean z, String str2) {
        this.client = client;
        this.index = str;
        this.json = z;
        this.messageField = str2;
    }

    public void index(String str, String str2) {
        logger.debug("Queuing... [channel={}, message={}]", str, str2);
        this.queue.offer(new String[]{str, str2});
        logger.debug("... {} now queued", Integer.valueOf(this.queue.size()));
    }

    public void shutdown() {
        this.queue.offer(POISON);
    }

    @Override // java.lang.Runnable
    public void run() {
        String[] take;
        logger.debug("Starting indexer");
        while (true) {
            try {
                take = this.queue.take();
            } catch (InterruptedException e) {
            }
            if (take == POISON) {
                logger.info("Poison pill eaten - shutting down subscriber thread");
                logger.debug("Indexer shutdown");
                return;
            }
            try {
                String str = take[0];
                String source = getSource(take[1]);
                logger.debug("Indexing... [index={}, type={}, source={}]", new String[]{this.index, str, source});
                this.client.prepareIndex(this.index, str).setSource(source).execute().actionGet();
                logger.debug("...indexed");
            } catch (Exception e2) {
                logger.warn("{}", e2);
            }
        }
    }

    String getSource(String str) throws IOException {
        return this.json ? str : XContentFactory.jsonBuilder().startObject().field(this.messageField, str).field("timestamp", System.currentTimeMillis()).endObject().string();
    }
}
