/*
 * Decompiled with CFR 0.152.
 */
package org.apache.streams.elasticsearch.test;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.List;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater;
import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.Actor;
import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@FixMethodOrder(value=MethodSorters.NAME_ASCENDING)
@ElasticsearchIntegrationTest.ClusterScope(scope=ElasticsearchIntegrationTest.Scope.TEST, numNodes=1)
public class TestElasticsearchPersistWriterIT
extends ElasticsearchIntegrationTest {
    protected String TEST_INDEX = "TestElasticsearchPersistWriter".toLowerCase();
    private static final Logger LOGGER = LoggerFactory.getLogger(TestElasticsearchPersistWriterIT.class);
    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
    protected ElasticsearchWriterConfiguration testConfiguration;

    @Before
    public void prepareTest() {
        this.testConfiguration = new ElasticsearchWriterConfiguration();
        this.testConfiguration.setHosts((List)Lists.newArrayList((Object[])new String[]{"localhost"}));
        this.testConfiguration.setClusterName(TestElasticsearchPersistWriterIT.cluster().getClusterName());
        this.testConfiguration.setIndex("writer");
        this.testConfiguration.setType("activity");
    }

    @Test
    public void testPersist() throws Exception {
        this.testPersistWriter();
        this.testPersistUpdater();
    }

    void testPersistWriter() throws Exception {
        assert (!this.indexExists(this.TEST_INDEX));
        ElasticsearchPersistWriter testPersistWriter = new ElasticsearchPersistWriter(this.testConfiguration);
        testPersistWriter.prepare(null);
        InputStream testActivityFolderStream = TestElasticsearchPersistWriterIT.class.getClassLoader().getResourceAsStream("activities");
        List files = IOUtils.readLines((InputStream)testActivityFolderStream, (Charset)Charsets.UTF_8);
        for (String file : files) {
            LOGGER.info("File: " + file);
            InputStream testActivityFileStream = TestElasticsearchPersistWriterIT.class.getClassLoader().getResourceAsStream("activities/" + file);
            Activity activity = (Activity)MAPPER.readValue(testActivityFileStream, Activity.class);
            StreamsDatum datum = new StreamsDatum((Object)activity, activity.getVerb());
            testPersistWriter.write(datum);
            LOGGER.info("Wrote: " + activity.getVerb());
        }
        testPersistWriter.cleanUp();
        this.flushAndRefresh();
        long count = ((CountResponse)TestElasticsearchPersistWriterIT.client().count((CountRequest)TestElasticsearchPersistWriterIT.client().prepareCount(new String[0]).request()).actionGet()).getCount();
        assert (count == 89L);
    }

    void testPersistUpdater() throws Exception {
        long count = ((CountResponse)TestElasticsearchPersistWriterIT.client().count((CountRequest)TestElasticsearchPersistWriterIT.client().prepareCount(new String[0]).request()).actionGet()).getCount();
        ElasticsearchPersistUpdater testPersistUpdater = new ElasticsearchPersistUpdater(this.testConfiguration);
        testPersistUpdater.prepare(null);
        InputStream testActivityFolderStream = TestElasticsearchPersistWriterIT.class.getClassLoader().getResourceAsStream("activities");
        List files = IOUtils.readLines((InputStream)testActivityFolderStream, (Charset)Charsets.UTF_8);
        for (String file : files) {
            LOGGER.info("File: " + file);
            InputStream testActivityFileStream = TestElasticsearchPersistWriterIT.class.getClassLoader().getResourceAsStream("activities/" + file);
            Activity activity = (Activity)MAPPER.readValue(testActivityFileStream, Activity.class);
            Activity update = new Activity();
            update.setAdditionalProperty("updated", (Object)Boolean.TRUE);
            update.setAdditionalProperty("str", (Object)"str");
            update.setAdditionalProperty("long", (Object)10L);
            update.setActor((Actor)new Actor().withAdditionalProperty("updated", (Object)Boolean.TRUE).withAdditionalProperty("double", (Object)10.0).withAdditionalProperty("map", (Object)MAPPER.createObjectNode().set("field", (JsonNode)MAPPER.createArrayNode().add("item"))));
            StreamsDatum datum = new StreamsDatum((Object)update, activity.getVerb());
            testPersistUpdater.write(datum);
            LOGGER.info("Updated: " + activity.getVerb());
        }
        testPersistUpdater.cleanUp();
        this.flushAndRefresh();
        long updated = ((CountResponse)TestElasticsearchPersistWriterIT.client().prepareCount(new String[0]).setQuery((QueryBuilder)QueryBuilders.filteredQuery((QueryBuilder)QueryBuilders.matchAllQuery(), (FilterBuilder)FilterBuilders.existsFilter((String)"updated"))).execute().actionGet()).getCount();
        LOGGER.info("updated: {}", (Object)updated);
        TestElasticsearchPersistWriterIT.assertEquals((long)count, (long)updated);
        long actorupdated = ((CountResponse)TestElasticsearchPersistWriterIT.client().prepareCount(new String[0]).setQuery((QueryBuilder)QueryBuilders.termQuery((String)"actor.updated", (boolean)true)).execute().actionGet()).getCount();
        LOGGER.info("actor.updated: {}", (Object)actorupdated);
        TestElasticsearchPersistWriterIT.assertEquals((long)count, (long)actorupdated);
        long strupdated = ((CountResponse)TestElasticsearchPersistWriterIT.client().prepareCount(new String[0]).setQuery((QueryBuilder)QueryBuilders.termQuery((String)"str", (String)"str")).execute().actionGet()).getCount();
        LOGGER.info("strupdated: {}", (Object)strupdated);
        TestElasticsearchPersistWriterIT.assertEquals((long)count, (long)strupdated);
        long longupdated = ((CountResponse)TestElasticsearchPersistWriterIT.client().prepareCount(new String[0]).setQuery((QueryBuilder)QueryBuilders.rangeQuery((String)"long").from(9).to(11)).execute().actionGet()).getCount();
        LOGGER.info("longupdated: {}", (Object)longupdated);
        TestElasticsearchPersistWriterIT.assertEquals((long)count, (long)longupdated);
        long doubleupdated = ((CountResponse)TestElasticsearchPersistWriterIT.client().prepareCount(new String[0]).setQuery((QueryBuilder)QueryBuilders.rangeQuery((String)"long").from(9).to(11)).execute().actionGet()).getCount();
        LOGGER.info("doubleupdated: {}", (Object)doubleupdated);
        TestElasticsearchPersistWriterIT.assertEquals((long)count, (long)doubleupdated);
        long mapfieldupdated = ((CountResponse)TestElasticsearchPersistWriterIT.client().prepareCount(new String[0]).setQuery((QueryBuilder)QueryBuilders.termQuery((String)"actor.map.field", (String)"item")).execute().actionGet()).getCount();
        LOGGER.info("mapfieldupdated: {}", (Object)mapfieldupdated);
        TestElasticsearchPersistWriterIT.assertEquals((long)count, (long)mapfieldupdated);
    }
}

