package org.apache.streams.example.elasticsearch.test;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
import org.apache.streams.elasticsearch.example.ElasticsearchReindex;
import org.apache.streams.elasticsearch.example.ElasticsearchReindexConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numNodes = 1)
/* loaded from: input_file:org/apache/streams/example/elasticsearch/test/ElasticsearchReindexIT.class */
public class ElasticsearchReindexIT extends ElasticsearchIntegrationTest {
    private static final Logger LOGGER;
    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
    ElasticsearchConfiguration testConfiguration = new ElasticsearchConfiguration();
    static final /* synthetic */ boolean $assertionsDisabled;

    @Before
    public void prepareTest() throws Exception {
        this.testConfiguration = new ElasticsearchConfiguration();
        this.testConfiguration.setHosts(Lists.newArrayList(new String[]{"localhost"}));
        this.testConfiguration.setClusterName(cluster().getClusterName());
        ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = (ElasticsearchWriterConfiguration) this.MAPPER.convertValue(this.testConfiguration, ElasticsearchWriterConfiguration.class);
        elasticsearchWriterConfiguration.setIndex("source");
        elasticsearchWriterConfiguration.setType("activity");
        elasticsearchWriterConfiguration.setBatchSize(5L);
        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(elasticsearchWriterConfiguration);
        elasticsearchPersistWriter.prepare((Object) null);
        for (String str : IOUtils.readLines(ElasticsearchReindexIT.class.getClassLoader().getResourceAsStream("activities"), Charsets.UTF_8)) {
            LOGGER.info("File: " + str);
            Activity activity = (Activity) this.MAPPER.readValue(ElasticsearchReindexIT.class.getClassLoader().getResourceAsStream("activities/" + str), Activity.class);
            elasticsearchPersistWriter.write(new StreamsDatum(activity, activity.getVerb()));
            LOGGER.info("Wrote: " + activity.getVerb());
        }
        elasticsearchPersistWriter.cleanUp();
        flushAndRefresh();
    }

    @Test
    public void testReindex() throws Exception {
        ElasticsearchReindexConfiguration elasticsearchReindexConfiguration = (ElasticsearchReindexConfiguration) this.MAPPER.readValue(ElasticsearchReindexIT.class.getResourceAsStream("/testReindex.json"), ElasticsearchReindexConfiguration.class);
        elasticsearchReindexConfiguration.getDestination().setClusterName(cluster().getClusterName());
        elasticsearchReindexConfiguration.getSource().setClusterName(cluster().getClusterName());
        if (!$assertionsDisabled && !indexExists("source")) {
            throw new AssertionError();
        }
        long count = ((CountResponse) client().count(client().prepareCount(new String[]{"source"}).request()).get()).getCount();
        if (!$assertionsDisabled && count <= 0) {
            throw new AssertionError();
        }
        Thread thread = new Thread((Runnable) new ElasticsearchReindex(elasticsearchReindexConfiguration));
        thread.start();
        thread.join();
        flushAndRefresh();
        if (!$assertionsDisabled && !indexExists("destination")) {
            throw new AssertionError();
        }
        long count2 = ((CountResponse) client().count(client().prepareCount(new String[]{"destination"}).request()).get()).getCount();
        if (!$assertionsDisabled && count != count2) {
            throw new AssertionError();
        }
    }

    static {
        $assertionsDisabled = !ElasticsearchReindexIT.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
    }
}
