package org.apache.streams.elasticsearch.test;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.elasticsearch.ElasticsearchClientManager;
import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.class */
public class ElasticsearchPersistWriterIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriterIT.class);
    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
    private ElasticsearchWriterConfiguration testConfiguration;
    private Client testClient;

    @BeforeClass
    public void prepareTestPersistWriter() throws Exception {
        this.testConfiguration = new ComponentConfigurator(ElasticsearchWriterConfiguration.class).detectConfiguration("ElasticsearchPersistWriterIT");
        this.testClient = ElasticsearchClientManager.getInstance(this.testConfiguration).client();
        Assert.assertNotEquals(((ClusterHealthResponse) this.testClient.admin().cluster().health(Requests.clusterHealthRequest(new String[0])).actionGet()).getStatus(), ClusterHealthStatus.RED);
        if (((IndicesExistsResponse) this.testClient.admin().indices().exists(Requests.indicesExistsRequest(new String[]{this.testConfiguration.getIndex()})).actionGet()).isExists()) {
            Assert.assertTrue(((DeleteIndexResponse) this.testClient.admin().indices().delete(Requests.deleteIndexRequest(this.testConfiguration.getIndex())).actionGet()).isAcknowledged());
        }
    }

    @Test
    public void testPersistWriter() throws Exception {
        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(this.testConfiguration);
        elasticsearchPersistWriter.prepare((Object) null);
        for (Path path : (List) Files.list(Paths.get("target/dependency/activitystreams-testdata", new String[0])).collect(Collectors.toList())) {
            LOGGER.info("File: " + path);
            Activity activity = (Activity) MAPPER.readValue(new FileInputStream(path.toFile()), Activity.class);
            elasticsearchPersistWriter.write(new StreamsDatum(activity, activity.getVerb()));
            LOGGER.info("Wrote: " + activity.getVerb());
        }
        elasticsearchPersistWriter.cleanUp();
        Assert.assertEquals(89L, ((SearchResponse) this.testClient.prepareSearch(new String[]{this.testConfiguration.getIndex()}).setTypes(new String[]{this.testConfiguration.getType()}).execute().actionGet()).getHits().getTotalHits());
    }
}
