package org.apache.streams.elasticsearch.test;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
import java.io.File;
import java.nio.charset.StandardCharsets;
import org.apache.commons.io.IOUtils;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.elasticsearch.ElasticsearchClientManager;
import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater;
import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.ActivityObject;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.index.query.QueryBuilders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"ElasticsearchPersistUpdaterIT"}, dependsOnGroups = {"ElasticsearchPersistWriterIT"})
/* loaded from: input_file:org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.class */
public class ElasticsearchPersistUpdaterIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdaterIT.class);
    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
    protected ElasticsearchWriterConfiguration testConfiguration;
    protected Client testClient;

    @BeforeClass
    public void prepareTestPersistUpdater() throws Exception {
        Config load = ConfigFactory.load();
        File file = new File("target/test-classes/ElasticsearchPersistUpdaterIT.conf");
        Assert.assertTrue(file.exists());
        this.testConfiguration = new ComponentConfigurator(ElasticsearchWriterConfiguration.class).detectConfiguration(ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)).withFallback(load).resolve(), "elasticsearch");
        this.testClient = ElasticsearchClientManager.getInstance(this.testConfiguration).client();
        Assert.assertNotEquals(((ClusterHealthResponse) this.testClient.admin().cluster().health(Requests.clusterHealthRequest(new String[0])).actionGet()).getStatus(), ClusterHealthStatus.RED);
        Assert.assertTrue(((IndicesExistsResponse) this.testClient.admin().indices().exists(Requests.indicesExistsRequest(new String[]{this.testConfiguration.getIndex()})).actionGet()).isExists());
    }

    @Test
    public void testPersistUpdater() throws Exception {
        Assert.assertTrue(((IndicesExistsResponse) this.testClient.admin().indices().exists(Requests.indicesExistsRequest(new String[]{this.testConfiguration.getIndex()})).actionGet()).isExists());
        long totalHits = ((SearchResponse) this.testClient.prepareSearch(new String[]{this.testConfiguration.getIndex()}).setTypes(new String[]{this.testConfiguration.getType()}).execute().actionGet()).getHits().getTotalHits();
        ElasticsearchPersistUpdater elasticsearchPersistUpdater = new ElasticsearchPersistUpdater(this.testConfiguration);
        elasticsearchPersistUpdater.prepare((Object) null);
        for (String str : IOUtils.readLines(ElasticsearchPersistUpdaterIT.class.getClassLoader().getResourceAsStream("activities"), StandardCharsets.UTF_8)) {
            LOGGER.info("File: " + str);
            Activity activity = (Activity) MAPPER.readValue(ElasticsearchPersistUpdaterIT.class.getClassLoader().getResourceAsStream("activities/" + str), Activity.class);
            Activity activity2 = new Activity();
            activity2.setAdditionalProperty("updated", Boolean.TRUE);
            activity2.setAdditionalProperty("str", "str");
            activity2.setAdditionalProperty("long", 10L);
            activity2.setActor(new ActivityObject().withAdditionalProperty("updated", Boolean.TRUE).withAdditionalProperty("double", Double.valueOf(10.0d)).withAdditionalProperty("map", MAPPER.createObjectNode().set("field", MAPPER.createArrayNode().add("item"))));
            elasticsearchPersistUpdater.write(new StreamsDatum(activity2, activity.getVerb()));
            LOGGER.info("Updated: " + activity.getVerb());
        }
        elasticsearchPersistUpdater.cleanUp();
        SearchResponse searchResponse = (SearchResponse) this.testClient.prepareSearch(new String[]{this.testConfiguration.getIndex()}).setTypes(new String[]{this.testConfiguration.getType()}).setQuery(QueryBuilders.existsQuery("updated")).execute().actionGet();
        LOGGER.info("updated: {}", Long.valueOf(searchResponse.getHits().getTotalHits()));
        Assert.assertEquals(totalHits, searchResponse.getHits().getTotalHits());
        SearchResponse searchResponse2 = (SearchResponse) this.testClient.prepareSearch(new String[]{this.testConfiguration.getIndex()}).setTypes(new String[]{this.testConfiguration.getType()}).setQuery(QueryBuilders.termQuery("actor.updated", true)).execute().actionGet();
        LOGGER.info("actor.updated: {}", Long.valueOf(searchResponse2.getHits().getTotalHits()));
        Assert.assertEquals(totalHits, searchResponse2.getHits().getTotalHits());
        SearchResponse searchResponse3 = (SearchResponse) this.testClient.prepareSearch(new String[]{this.testConfiguration.getIndex()}).setTypes(new String[]{this.testConfiguration.getType()}).setQuery(QueryBuilders.termQuery("str", "str")).execute().actionGet();
        LOGGER.info("strupdated: {}", Long.valueOf(searchResponse3.getHits().getTotalHits()));
        Assert.assertEquals(totalHits, searchResponse3.getHits().getTotalHits());
        SearchResponse searchResponse4 = (SearchResponse) this.testClient.prepareSearch(new String[]{this.testConfiguration.getIndex()}).setTypes(new String[]{this.testConfiguration.getType()}).setQuery(QueryBuilders.rangeQuery("long").from(9).to(11)).execute().actionGet();
        LOGGER.info("longupdated: {}", Long.valueOf(searchResponse4.getHits().getTotalHits()));
        Assert.assertEquals(totalHits, searchResponse4.getHits().getTotalHits());
        SearchResponse searchResponse5 = (SearchResponse) this.testClient.prepareSearch(new String[]{this.testConfiguration.getIndex()}).setTypes(new String[]{this.testConfiguration.getType()}).setQuery(QueryBuilders.rangeQuery("long").from(9).to(11)).execute().actionGet();
        LOGGER.info("doubleupdated: {}", Long.valueOf(searchResponse5.getHits().getTotalHits()));
        Assert.assertEquals(totalHits, searchResponse5.getHits().getTotalHits());
        SearchResponse searchResponse6 = (SearchResponse) this.testClient.prepareSearch(new String[]{this.testConfiguration.getIndex()}).setTypes(new String[]{this.testConfiguration.getType()}).setQuery(QueryBuilders.termQuery("actor.map.field", "item")).execute().actionGet();
        LOGGER.info("mapfieldupdated: {}", Long.valueOf(searchResponse6.getHits().getTotalHits()));
        Assert.assertEquals(totalHits, searchResponse6.getHits().getTotalHits());
    }
}
