package org.apache.streams.elasticsearch.test;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.elasticsearch.ElasticsearchClientManager;
import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.ActivityObject;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.reflections.Reflections;
import org.reflections.scanners.Scanner;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"ElasticsearchParentChildWriterIT"})
/* loaded from: input_file:org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.class */
public class ElasticsearchParentChildWriterIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchParentChildWriterIT.class);
    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
    protected ElasticsearchWriterConfiguration testConfiguration;
    protected Client testClient;
    Set<Class<? extends ActivityObject>> objectTypes;
    List<String> files;

    @BeforeClass
    public void prepareTestParentChildPersistWriter() throws Exception {
        Config load = ConfigFactory.load();
        File file = new File("target/test-classes/ElasticsearchParentChildWriterIT.conf");
        Assert.assertTrue(file.exists());
        this.testConfiguration = new ComponentConfigurator(ElasticsearchWriterConfiguration.class).detectConfiguration(ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)).withFallback(load).resolve(), "elasticsearch");
        this.testClient = ElasticsearchClientManager.getInstance(this.testConfiguration).client();
        Assert.assertNotEquals(((ClusterHealthResponse) this.testClient.admin().cluster().health(Requests.clusterHealthRequest(new String[0])).actionGet()).getStatus(), ClusterHealthStatus.RED);
        if (((IndicesExistsResponse) this.testClient.admin().indices().exists(Requests.indicesExistsRequest(new String[]{this.testConfiguration.getIndex()})).actionGet()).isExists()) {
            Assert.assertTrue(((DeleteIndexResponse) this.testClient.admin().indices().delete(Requests.deleteIndexRequest(this.testConfiguration.getIndex())).actionGet()).isAcknowledged());
        }
        PutIndexTemplateRequestBuilder preparePutTemplate = this.testClient.admin().indices().preparePutTemplate("mappings");
        preparePutTemplate.setSource(MAPPER.writeValueAsString((ObjectNode) MAPPER.readValue(ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json"), ObjectNode.class)));
        this.testClient.admin().indices().putTemplate(preparePutTemplate.request()).actionGet();
        this.objectTypes = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forPackage("org.apache.streams.pojo.json", new ClassLoader[0])).setScanners(new Scanner[]{new SubTypesScanner()})).getSubTypesOf(ActivityObject.class);
        this.files = IOUtils.readLines(ElasticsearchParentChildWriterIT.class.getClassLoader().getResourceAsStream("activities"), StandardCharsets.UTF_8);
    }

    @Test
    public void testParentChildPersistWriter() throws Exception {
        if (((IndicesExistsResponse) this.testClient.admin().indices().exists(Requests.indicesExistsRequest(new String[]{this.testConfiguration.getIndex()})).actionGet()).isExists()) {
        }
        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(this.testConfiguration);
        elasticsearchPersistWriter.prepare((Object) null);
        Iterator<Class<? extends ActivityObject>> it = this.objectTypes.iterator();
        while (it.hasNext()) {
            ActivityObject activityObject = (ActivityObject) MAPPER.convertValue(it.next().newInstance(), ActivityObject.class);
            StreamsDatum streamsDatum = new StreamsDatum(activityObject, activityObject.getObjectType());
            streamsDatum.getMetadata().put("type", "object");
            elasticsearchPersistWriter.write(streamsDatum);
        }
        for (String str : this.files) {
            LOGGER.info("File: " + str);
            Activity activity = (Activity) MAPPER.readValue(ElasticsearchParentChildWriterIT.class.getClassLoader().getResourceAsStream("activities/" + str), Activity.class);
            StreamsDatum streamsDatum2 = new StreamsDatum(activity, activity.getVerb());
            if (!StringUtils.isEmpty(activity.getObject().getObjectType())) {
                streamsDatum2.getMetadata().put("parent", activity.getObject().getObjectType());
                streamsDatum2.getMetadata().put("type", "activity");
                elasticsearchPersistWriter.write(streamsDatum2);
                LOGGER.info("Wrote: " + activity.getVerb());
            }
        }
        elasticsearchPersistWriter.cleanUp();
        Assert.assertEquals(41L, ((SearchResponse) this.testClient.prepareSearch(new String[]{this.testConfiguration.getIndex()}).setTypes(new String[]{"object"}).execute().actionGet()).getHits().getTotalHits());
        Assert.assertEquals(84L, ((SearchResponse) this.testClient.prepareSearch(new String[]{this.testConfiguration.getIndex()}).setTypes(new String[]{"activity"}).execute().actionGet()).getHits().getTotalHits());
    }
}
