package org.apache.beam.sdk.io.elasticsearch;

import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.elasticsearch.ElasticSearchIOTestUtils;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.hamcrest.CustomMatcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.class */
public class ElasticsearchIOTest implements Serializable {
    private static final String ES_INDEX = "beam";
    private static final String ES_TYPE = "test";
    private static final String ES_IP = "127.0.0.1";
    private static final long NUM_DOCS = 400;
    private static final int NUM_SCIENTISTS = 10;
    private static final long BATCH_SIZE = 200;
    private static final long AVERAGE_DOC_SIZE = 25;
    private static final long BATCH_SIZE_BYTES = 2048;
    private static Node node;
    private static ElasticsearchIO.ConnectionConfiguration connectionConfiguration;

    @Rule
    public TestPipeline pipeline = TestPipeline.create();

    @Rule
    public ExpectedException exception = ExpectedException.none();
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchIOTest.class);

    @ClassRule
    public static TemporaryFolder folder = new TemporaryFolder();

    @BeforeClass
    public static void beforeClass() throws IOException {
        ServerSocket serverSocket = new ServerSocket(0);
        int localPort = serverSocket.getLocalPort();
        serverSocket.close();
        connectionConfiguration = ElasticsearchIO.ConnectionConfiguration.create(new String[]{"http://127.0.0.1:" + localPort}, ES_INDEX, ES_TYPE);
        LOGGER.info("Starting embedded Elasticsearch instance ({})", Integer.valueOf(localPort));
        node = NodeBuilder.nodeBuilder().settings(Settings.settingsBuilder().put("cluster.name", ES_INDEX).put("http.enabled", "true").put("node.data", "true").put("path.data", folder.getRoot().getPath()).put("path.home", folder.getRoot().getPath()).put("node.name", ES_INDEX).put("network.host", ES_IP).put("http.port", localPort).put("index.store.stats_refresh_interval", 0).put("threadpool.bulk.queue_size", 100)).build();
        LOGGER.info("Elasticsearch node created");
        node.start();
    }

    @AfterClass
    public static void afterClass() {
        node.close();
    }

    @Before
    public void before() throws Exception {
        ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, node.client());
    }

    @Test
    public void testSizes() throws Exception {
        ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
        long estimatedSizeBytes = new ElasticsearchIO.BoundedElasticsearchSource(ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration), (String) null).getEstimatedSizeBytes(PipelineOptionsFactory.create());
        LOGGER.info("Estimated size: {}", Long.valueOf(estimatedSizeBytes));
        Assert.assertThat("Wrong estimated size", Long.valueOf(estimatedSizeBytes), Matchers.greaterThan(10000L));
    }

    @Test
    @Category({RunnableOnService.class})
    public void testRead() throws Exception {
        ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
        PAssert.thatSingleton(this.pipeline.apply(ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration).withScrollKeepalive("5m").withBatchSize(100L)).apply("Count", Count.globally())).isEqualTo(Long.valueOf(NUM_DOCS));
        this.pipeline.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testReadWithQuery() throws Exception {
        ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
        PAssert.thatSingleton(this.pipeline.apply(ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration).withQuery("{\n  \"query\": {\n  \"match\" : {\n    \"scientist\" : {\n      \"query\" : \"Einstein\",\n      \"type\" : \"boolean\"\n    }\n  }\n  }\n}")).apply("Count", Count.globally())).isEqualTo(40L);
        this.pipeline.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWrite() throws Exception {
        this.pipeline.apply(Create.of(ElasticSearchIOTestUtils.createDocuments(NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS))).apply(ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration));
        this.pipeline.run();
        Assert.assertEquals(NUM_DOCS, ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, node.client()));
        Assert.assertEquals(40L, ((SearchResponse) node.client().prepareSearch(new String[]{ES_INDEX}).setTypes(new String[]{ES_TYPE}).setQuery(QueryBuilders.queryStringQuery("Einstein").field("scientist")).execute().actionGet()).getHits().getTotalHits());
    }

    @Test
    public void testWriteWithErrors() throws Exception {
        DoFnTester of = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration).withMaxBatchSize(BATCH_SIZE)));
        List<String> createDocuments = ElasticSearchIOTestUtils.createDocuments(NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS);
        this.exception.expect(Is.isA(IOException.class));
        this.exception.expectMessage(new CustomMatcher<String>("RegExp matcher") { // from class: org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTest.1
            public boolean matches(Object obj) {
                return ((String) obj).matches("(?is).*Error writing to Elasticsearch, some elements could not be inserted.*Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*");
            }
        });
        of.processBundle(createDocuments);
    }

    @Test
    public void testWriteWithMaxBatchSize() throws Exception {
        DoFnTester of = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration).withMaxBatchSize(BATCH_SIZE)));
        long j = 0;
        long j2 = 0;
        Iterator<String> it = ElasticSearchIOTestUtils.createDocuments(NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS).iterator();
        while (it.hasNext()) {
            of.processElement(it.next());
            j++;
            if (j % 100 == 0) {
                long upgradeIndexAndGetCurrentNumDocs = ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, node.client());
                if (j % BATCH_SIZE == 0) {
                    Assert.assertEquals("we are at the end of a bundle, we should have inserted all processed documents", j, upgradeIndexAndGetCurrentNumDocs);
                    j2 = upgradeIndexAndGetCurrentNumDocs;
                } else {
                    Assert.assertEquals("we are not at the end of a bundle, we should have inserted no more documents", j2, upgradeIndexAndGetCurrentNumDocs);
                }
            }
        }
    }

    @Test
    public void testWriteWithMaxBatchSizeBytes() throws Exception {
        DoFnTester of = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration).withMaxBatchSizeBytes(BATCH_SIZE_BYTES)));
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        long j4 = 0;
        Iterator<String> it = ElasticSearchIOTestUtils.createDocuments(NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS).iterator();
        while (it.hasNext()) {
            of.processElement(it.next());
            j++;
            j2 += r0.getBytes().length;
            if (j % 40 == 0) {
                long upgradeIndexAndGetCurrentNumDocs = ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, node.client());
                if (j2 / BATCH_SIZE_BYTES > j4) {
                    Assert.assertThat("we have passed a bundle size, we should have inserted some documents", Long.valueOf(upgradeIndexAndGetCurrentNumDocs), Matchers.greaterThan(Long.valueOf(j3)));
                    j3 = upgradeIndexAndGetCurrentNumDocs;
                    j4 = j2 / BATCH_SIZE_BYTES;
                } else {
                    Assert.assertEquals("we are not at the end of a bundle, we should have inserted no more documents", j3, upgradeIndexAndGetCurrentNumDocs);
                }
            }
        }
    }

    @Test
    public void testSplitIntoBundles() throws Exception {
        ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
        PipelineOptions create = PipelineOptionsFactory.create();
        ElasticsearchIO.BoundedElasticsearchSource boundedElasticsearchSource = new ElasticsearchIO.BoundedElasticsearchSource(ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration), (String) null);
        List splitIntoBundles = boundedElasticsearchSource.splitIntoBundles(0, create);
        SourceTestUtils.assertSourcesEqualReferenceSource(boundedElasticsearchSource, splitIntoBundles, create);
        Assert.assertEquals(5, splitIntoBundles.size());
        int i = 0;
        Iterator it = splitIntoBundles.iterator();
        while (it.hasNext()) {
            if (SourceTestUtils.readFromSource((BoundedSource) it.next(), create).size() > 0) {
                i++;
            }
        }
        Assert.assertEquals("Wrong number of empty splits", 5, i);
    }
}
