/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.elasticsearch;

import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.util.List;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.elasticsearch.ElasticSearchIOTestUtils;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.node.Node;
import org.hamcrest.CustomMatcher;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class ElasticsearchIOTest
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOTest.class);
    private static final String ES_INDEX = "beam";
    private static final String ES_TYPE = "test";
    private static final String ES_IP = "127.0.0.1";
    private static final long NUM_DOCS = 400L;
    private static final int NUM_SCIENTISTS = 10;
    private static final long BATCH_SIZE = 200L;
    private static final long AVERAGE_DOC_SIZE = 25L;
    private static final long BATCH_SIZE_BYTES = 2048L;
    private static Node node;
    private static RestClient restClient;
    private static ElasticsearchIO.ConnectionConfiguration connectionConfiguration;
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER;
    @Rule
    public TestPipeline pipeline = TestPipeline.create();
    @Rule
    public ExpectedException exception = ExpectedException.none();

    @BeforeClass
    public static void beforeClass() throws IOException {
        ServerSocket serverSocket = new ServerSocket(0);
        int esHttpPort = serverSocket.getLocalPort();
        serverSocket.close();
        LOG.info("Starting embedded Elasticsearch instance ({})", (Object)esHttpPort);
        Settings.Builder settingsBuilder = Settings.settingsBuilder().put("cluster.name", ES_INDEX).put("http.enabled", "true").put("node.data", "true").put("path.data", TEMPORARY_FOLDER.getRoot().getPath()).put("path.home", TEMPORARY_FOLDER.getRoot().getPath()).put("node.name", ES_INDEX).put("network.host", ES_IP).put("http.port", esHttpPort).put("index.store.stats_refresh_interval", 0).put("threadpool.bulk.queue_size", 100);
        node = new Node(settingsBuilder.build());
        LOG.info("Elasticsearch node created");
        node.start();
        connectionConfiguration = ElasticsearchIO.ConnectionConfiguration.create((String[])new String[]{"http://127.0.0.1:" + esHttpPort}, (String)ES_INDEX, (String)ES_TYPE);
        restClient = connectionConfiguration.createClient();
    }

    @AfterClass
    public static void afterClass() throws IOException {
        restClient.close();
        node.close();
    }

    @Before
    public void before() throws Exception {
        ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, restClient);
    }

    @Test
    public void testSizes() throws Exception {
        ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, 400L, restClient);
        PipelineOptions options = PipelineOptionsFactory.create();
        ElasticsearchIO.Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
        ElasticsearchIO.BoundedElasticsearchSource initialSource = new ElasticsearchIO.BoundedElasticsearchSource(read, null);
        long estimatedSize = initialSource.getEstimatedSizeBytes(options);
        LOG.info("Estimated size: {}", (Object)estimatedSize);
        Assert.assertThat((String)"Wrong estimated size", (Object)estimatedSize, (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(10000L)));
    }

    @Test
    public void testRead() throws Exception {
        ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, 400L, restClient);
        PCollection output = (PCollection)this.pipeline.apply((PTransform)ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration).withScrollKeepalive("5m").withBatchSize(100L));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)400L);
        this.pipeline.run();
    }

    @Test
    public void testReadWithQuery() throws Exception {
        ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, 400L, restClient);
        String query = "{\n  \"query\": {\n  \"match\" : {\n    \"scientist\" : {\n      \"query\" : \"Einstein\",\n      \"type\" : \"boolean\"\n    }\n  }\n  }\n}";
        PCollection output = (PCollection)this.pipeline.apply((PTransform)ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration).withQuery(query));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)40L);
        this.pipeline.run();
    }

    @Test
    public void testWrite() throws Exception {
        List<String> data = ElasticSearchIOTestUtils.createDocuments(400L, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
        ((PCollection)this.pipeline.apply((PTransform)Create.of(data))).apply((PTransform)ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration));
        this.pipeline.run();
        long currentNumDocs = ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, restClient);
        Assert.assertEquals((long)400L, (long)currentNumDocs);
        QueryStringQueryBuilder queryBuilder = QueryBuilders.queryStringQuery((String)"Einstein").field("scientist");
        SearchResponse searchResponse = (SearchResponse)node.client().prepareSearch(new String[]{ES_INDEX}).setTypes(new String[]{ES_TYPE}).setQuery((QueryBuilder)queryBuilder).execute().actionGet();
        Assert.assertEquals((long)40L, (long)searchResponse.getHits().getTotalHits());
    }

    @Test
    public void testWriteWithErrors() throws Exception {
        ElasticsearchIO.Write write = ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration).withMaxBatchSize(200L);
        DoFnTester fnTester = DoFnTester.of((DoFn)new ElasticsearchIO.Write.WriteFn(write));
        List<String> input = ElasticSearchIOTestUtils.createDocuments(400L, ElasticSearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS);
        this.exception.expect(Is.isA(IOException.class));
        this.exception.expectMessage((Matcher)new CustomMatcher<String>("RegExp matcher"){

            public boolean matches(Object o) {
                String message = (String)o;
                return message.matches("(?is).*Error writing to Elasticsearch, some elements could not be inserted.*Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*");
            }
        });
        fnTester.processBundle(input);
    }

    @Test
    public void testWriteWithMaxBatchSize() throws Exception {
        ElasticsearchIO.Write write = ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration).withMaxBatchSize(200L);
        DoFnTester fnTester = DoFnTester.of((DoFn)new ElasticsearchIO.Write.WriteFn(write));
        List<String> input = ElasticSearchIOTestUtils.createDocuments(400L, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
        long numDocsProcessed = 0L;
        long numDocsInserted = 0L;
        for (String document : input) {
            fnTester.processElement((Object)document);
            if (++numDocsProcessed % 100L != 0L) continue;
            long currentNumDocs = ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, restClient);
            if (numDocsProcessed % 200L == 0L) {
                Assert.assertEquals((String)"we are at the end of a bundle, we should have inserted all processed documents", (long)numDocsProcessed, (long)currentNumDocs);
                numDocsInserted = currentNumDocs;
                continue;
            }
            Assert.assertEquals((String)"we are not at the end of a bundle, we should have inserted no more documents", (long)numDocsInserted, (long)currentNumDocs);
        }
    }

    @Test
    public void testWriteWithMaxBatchSizeBytes() throws Exception {
        ElasticsearchIO.Write write = ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration).withMaxBatchSizeBytes(2048L);
        DoFnTester fnTester = DoFnTester.of((DoFn)new ElasticsearchIO.Write.WriteFn(write));
        List<String> input = ElasticSearchIOTestUtils.createDocuments(400L, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
        long numDocsProcessed = 0L;
        long sizeProcessed = 0L;
        long numDocsInserted = 0L;
        long batchInserted = 0L;
        for (String document : input) {
            fnTester.processElement((Object)document);
            sizeProcessed += (long)document.getBytes().length;
            if (++numDocsProcessed % 40L != 0L) continue;
            long currentNumDocs = ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, restClient);
            if (sizeProcessed / 2048L > batchInserted) {
                Assert.assertThat((String)"we have passed a bundle size, we should have inserted some documents", (Object)currentNumDocs, (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(numDocsInserted)));
                numDocsInserted = currentNumDocs;
                batchInserted = sizeProcessed / 2048L;
                continue;
            }
            Assert.assertEquals((String)"we are not at the end of a bundle, we should have inserted no more documents", (long)numDocsInserted, (long)currentNumDocs);
        }
    }

    @Test
    public void testSplit() throws Exception {
        ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, 400L, restClient);
        PipelineOptions options = PipelineOptionsFactory.create();
        ElasticsearchIO.Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
        ElasticsearchIO.BoundedElasticsearchSource initialSource = new ElasticsearchIO.BoundedElasticsearchSource(read, null);
        int desiredBundleSizeBytes = 0;
        List splits = initialSource.split((long)desiredBundleSizeBytes, options);
        SourceTestUtils.assertSourcesEqualReferenceSource((BoundedSource)initialSource, (List)splits, (PipelineOptions)options);
        int expectedNumSplits = 5;
        Assert.assertEquals((long)expectedNumSplits, (long)splits.size());
        int nonEmptySplits = 0;
        for (BoundedSource subSource : splits) {
            if (SourceTestUtils.readFromSource((BoundedSource)subSource, (PipelineOptions)options).size() <= 0) continue;
            ++nonEmptySplits;
        }
        Assert.assertEquals((String)"Wrong number of empty splits", (long)expectedNumSplits, (long)nonEmptySplits);
    }

    static {
        TEMPORARY_FOLDER = new TemporaryFolder();
    }
}

