package org.apache.beam.sdk.io.elasticsearch;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.elasticsearch.ElasticSearchIOTestUtils;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.http.Header;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.client.RestClient;
import org.hamcrest.CustomMatcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.class */
class ElasticsearchIOTestCommon implements Serializable {
    private static final int EXPECTED_RETRIES = 2;
    private static final int MAX_ATTEMPTS = 3;
    private static final String OK_REQUEST = "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" : \"1\" } }\n{ \"field1\" : 1 }\n";
    private static final String BAD_REQUEST = "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" : \"1\" } }\n{ \"field1\" : @ }\n";
    static final String ES_TYPE = "test";
    static final long NUM_DOCS_UTESTS = 400;
    static final long NUM_DOCS_ITESTS = 50000;
    static final float ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE = 0.5f;
    private static final long AVERAGE_DOC_SIZE = 25;
    private static final long BATCH_SIZE = 200;
    private static final long BATCH_SIZE_BYTES = 2048;
    private final long numDocs;
    private final ElasticsearchIO.ConnectionConfiguration connectionConfiguration;
    private final RestClient restClient;
    private final boolean useAsITests;
    private TestPipeline pipeline;
    private ExpectedException expectedException;
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIOTestCommon.class);
    private static final ElasticsearchIO.RetryConfiguration.RetryPredicate CUSTOM_RETRY_PREDICATE = new ElasticsearchIO.RetryConfiguration.DefaultRetryPredicate(400);
    private static final String[] BAD_FORMATTED_DOC = {"{ \"x\" :a,\"y\":\"ab\" }"};

    /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon$ContainsStringCheckerFn.class */
    private static class ContainsStringCheckerFn implements SerializableFunction<Iterable<String>, Void> {
        private String expectedSubString;

        ContainsStringCheckerFn(String str) {
            this.expectedSubString = str;
        }

        public Void apply(Iterable<String> iterable) {
            Iterator<String> it = iterable.iterator();
            while (it.hasNext()) {
                if (it.next().contains(this.expectedSubString)) {
                    return null;
                }
            }
            Assert.fail("No string found containing " + this.expectedSubString);
            return null;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon$ExtractValueFn.class */
    private static class ExtractValueFn implements ElasticsearchIO.Write.FieldValueExtractFn {
        private final String fieldName;

        private ExtractValueFn(String str) {
            this.fieldName = str;
        }

        public String apply(JsonNode jsonNode) {
            return jsonNode.path(this.fieldName).asText();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon$Modulo2ValueFn.class */
    static class Modulo2ValueFn implements ElasticsearchIO.Write.FieldValueExtractFn {
        private final String fieldName;

        Modulo2ValueFn(String str) {
            this.fieldName = str;
        }

        public String apply(JsonNode jsonNode) {
            return "TYPE_" + (jsonNode.path(this.fieldName).asText().hashCode() % ElasticsearchIOTestCommon.EXPECTED_RETRIES);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String getEsIndex() {
        return "beam" + Thread.currentThread().getId();
    }

    ElasticsearchIOTestCommon(ElasticsearchIO.ConnectionConfiguration connectionConfiguration, RestClient restClient, boolean z) {
        this.connectionConfiguration = connectionConfiguration;
        this.restClient = restClient;
        this.numDocs = z ? NUM_DOCS_ITESTS : NUM_DOCS_UTESTS;
        this.useAsITests = z;
    }

    void setPipeline(TestPipeline testPipeline) {
        this.pipeline = testPipeline;
    }

    void setExpectedException(ExpectedException expectedException) {
        this.expectedException = expectedException;
    }

    void testSplit(int i) throws Exception {
        if (!this.useAsITests) {
            ElasticSearchIOTestUtils.insertTestDocuments(this.connectionConfiguration, this.numDocs, this.restClient);
        }
        PipelineOptions create = PipelineOptionsFactory.create();
        ElasticsearchIO.BoundedElasticsearchSource boundedElasticsearchSource = new ElasticsearchIO.BoundedElasticsearchSource(ElasticsearchIO.read().withConnectionConfiguration(this.connectionConfiguration), (String) null, (Integer) null, (Integer) null);
        List split = boundedElasticsearchSource.split(i, create);
        SourceTestUtils.assertSourcesEqualReferenceSource(boundedElasticsearchSource, split, create);
        Assert.assertEquals("Wrong number of splits", i == 0 ? 5 : (int) Math.ceil(((float) ElasticsearchIO.BoundedElasticsearchSource.estimateIndexSize(this.connectionConfiguration)) / i), split.size());
        int i2 = 0;
        Iterator it = split.iterator();
        while (it.hasNext()) {
            if (SourceTestUtils.readFromSource((BoundedSource) it.next(), create).isEmpty()) {
                i2++;
            }
        }
        Assert.assertThat("There are too many empty splits, parallelism is sub-optimal", Integer.valueOf(i2), Matchers.lessThan(Integer.valueOf((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * split.size()))));
    }

    void testSizes() throws Exception {
        if (!this.useAsITests) {
            ElasticSearchIOTestUtils.insertTestDocuments(this.connectionConfiguration, this.numDocs, this.restClient);
        }
        long estimatedSizeBytes = new ElasticsearchIO.BoundedElasticsearchSource(ElasticsearchIO.read().withConnectionConfiguration(this.connectionConfiguration), (String) null, (Integer) null, (Integer) null).getEstimatedSizeBytes(PipelineOptionsFactory.create());
        LOG.info("Estimated size: {}", Long.valueOf(estimatedSizeBytes));
        Assert.assertThat("Wrong estimated size", Long.valueOf(estimatedSizeBytes), Matchers.greaterThan(Long.valueOf(AVERAGE_DOC_SIZE * this.numDocs)));
    }

    void testRead() throws Exception {
        if (!this.useAsITests) {
            ElasticSearchIOTestUtils.insertTestDocuments(this.connectionConfiguration, this.numDocs, this.restClient);
        }
        PAssert.thatSingleton(this.pipeline.apply(ElasticsearchIO.read().withConnectionConfiguration(this.connectionConfiguration).withScrollKeepalive("5m").withBatchSize(100L)).apply("Count", Count.globally())).isEqualTo(Long.valueOf(this.numDocs));
        this.pipeline.run();
    }

    void testReadWithQuery() throws Exception {
        if (!this.useAsITests) {
            ElasticSearchIOTestUtils.insertTestDocuments(this.connectionConfiguration, this.numDocs, this.restClient);
        }
        PAssert.thatSingleton(this.pipeline.apply(ElasticsearchIO.read().withConnectionConfiguration(this.connectionConfiguration).withQuery("{\n  \"query\": {\n  \"match\" : {\n    \"scientist\" : {\n      \"query\" : \"Einstein\"\n    }\n  }\n  }\n}")).apply("Count", Count.globally())).isEqualTo(Long.valueOf(this.numDocs / ElasticSearchIOTestUtils.NUM_SCIENTISTS));
        this.pipeline.run();
    }

    void testReadWithMetadata() throws Exception {
        if (!this.useAsITests) {
            ElasticSearchIOTestUtils.insertTestDocuments(this.connectionConfiguration, 1L, this.restClient);
        }
        PAssert.that(this.pipeline.apply(ElasticsearchIO.read().withConnectionConfiguration(this.connectionConfiguration).withMetadata())).satisfies(new ContainsStringCheckerFn("\"_id\":\"0\""));
        this.pipeline.run();
    }

    void testWrite() throws Exception {
        executeWriteTest(ElasticsearchIO.write().withConnectionConfiguration(this.connectionConfiguration));
    }

    void testWriteWithErrors() throws Exception {
        ElasticsearchIO.Write withMaxBatchSize = ElasticsearchIO.write().withConnectionConfiguration(this.connectionConfiguration).withMaxBatchSize(BATCH_SIZE);
        List<String> createDocuments = ElasticSearchIOTestUtils.createDocuments(this.numDocs, ElasticSearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS);
        this.expectedException.expect(Is.isA(IOException.class));
        this.expectedException.expectMessage(new CustomMatcher<String>("RegExp matcher") { // from class: org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.1
            public boolean matches(Object obj) {
                return ((String) obj).matches("(?is).*Error writing to Elasticsearch, some elements could not be inserted.*Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*");
            }
        });
        DoFnTester of = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(withMaxBatchSize));
        Throwable th = null;
        try {
            try {
                of.processBundle(createDocuments);
                if (of != null) {
                    $closeResource(null, of);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (of != null) {
                $closeResource(th, of);
            }
            throw th3;
        }
    }

    void testWriteWithMaxBatchSize() throws Exception {
        DoFnTester of = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(ElasticsearchIO.write().withConnectionConfiguration(this.connectionConfiguration).withMaxBatchSize(BATCH_SIZE)));
        try {
            long j = 0;
            long j2 = 0;
            Iterator<String> it = ElasticSearchIOTestUtils.createDocuments(this.numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS).iterator();
            while (it.hasNext()) {
                of.processElement(it.next());
                j++;
                if (j % 100 == 0) {
                    long refreshIndexAndGetCurrentNumDocs = ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(this.connectionConfiguration, this.restClient);
                    if (j % BATCH_SIZE == 0) {
                        Assert.assertEquals("we are at the end of a bundle, we should have inserted all processed documents", j, refreshIndexAndGetCurrentNumDocs);
                        j2 = refreshIndexAndGetCurrentNumDocs;
                    } else {
                        Assert.assertEquals("we are not at the end of a bundle, we should have inserted no more documents", j2, refreshIndexAndGetCurrentNumDocs);
                    }
                }
            }
        } finally {
            if (of != null) {
                $closeResource(null, of);
            }
        }
    }

    void testWriteWithMaxBatchSizeBytes() throws Exception {
        DoFnTester of = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(ElasticsearchIO.write().withConnectionConfiguration(this.connectionConfiguration).withMaxBatchSizeBytes(BATCH_SIZE_BYTES)));
        Throwable th = null;
        try {
            try {
                long j = 0;
                long j2 = 0;
                long j3 = 0;
                long j4 = 0;
                Iterator<String> it = ElasticSearchIOTestUtils.createDocuments(this.numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS).iterator();
                while (it.hasNext()) {
                    of.processElement(it.next());
                    j++;
                    j2 += r0.getBytes(StandardCharsets.UTF_8).length;
                    if (j % 40 == 0) {
                        long refreshIndexAndGetCurrentNumDocs = ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(this.connectionConfiguration, this.restClient);
                        if (j2 / BATCH_SIZE_BYTES > j4) {
                            Assert.assertThat("we have passed a bundle size, we should have inserted some documents", Long.valueOf(refreshIndexAndGetCurrentNumDocs), Matchers.greaterThan(Long.valueOf(j3)));
                            j3 = refreshIndexAndGetCurrentNumDocs;
                            j4 = j2 / BATCH_SIZE_BYTES;
                        } else {
                            Assert.assertEquals("we are not at the end of a bundle, we should have inserted no more documents", j3, refreshIndexAndGetCurrentNumDocs);
                        }
                    }
                }
                if (of != null) {
                    $closeResource(null, of);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (of != null) {
                $closeResource(th, of);
            }
            throw th3;
        }
    }

    void testWriteWithIdFn() throws Exception {
        this.pipeline.apply(Create.of(ElasticSearchIOTestUtils.createDocuments(this.numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS))).apply(ElasticsearchIO.write().withConnectionConfiguration(this.connectionConfiguration).withIdFn(new ExtractValueFn("scientist")));
        this.pipeline.run();
        Assert.assertEquals(ElasticSearchIOTestUtils.NUM_SCIENTISTS, ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(this.connectionConfiguration, this.restClient));
        Assert.assertEquals(1L, ElasticSearchIOTestUtils.countByScientistName(this.connectionConfiguration, this.restClient, "Einstein"));
    }

    void testWriteWithIndexFn() throws Exception {
        this.pipeline.apply(Create.of(ElasticSearchIOTestUtils.createDocuments(10 * ElasticSearchIOTestUtils.FAMOUS_SCIENTISTS.length, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS))).apply(ElasticsearchIO.write().withConnectionConfiguration(this.connectionConfiguration).withIndexFn(new ExtractValueFn("scientist")));
        this.pipeline.run();
        for (String str : ElasticSearchIOTestUtils.FAMOUS_SCIENTISTS) {
            Assert.assertEquals(str + " index holds incorrect count", 10L, ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(this.restClient, str.toLowerCase(), this.connectionConfiguration.getType()));
        }
    }

    void testWriteWithTypeFn2x5x() throws Exception {
        long j = (this.numDocs & 1) == 0 ? this.numDocs : this.numDocs + 1;
        this.pipeline.apply(Create.of(ElasticSearchIOTestUtils.createDocuments(j, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS))).apply(ElasticsearchIO.write().withConnectionConfiguration(this.connectionConfiguration).withTypeFn(new Modulo2ValueFn("id")));
        this.pipeline.run();
        for (int i = 0; i < EXPECTED_RETRIES; i++) {
            String str = "TYPE_" + i;
            Assert.assertEquals(str + " holds incorrect count", j / 2, ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(this.restClient, this.connectionConfiguration.getIndex(), str));
        }
    }

    void testWriteWithFullAddressing() throws Exception {
        this.pipeline.apply(Create.of(ElasticSearchIOTestUtils.createDocuments(this.numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS))).apply(ElasticsearchIO.write().withConnectionConfiguration(this.connectionConfiguration).withIdFn(new ExtractValueFn("id")).withIndexFn(new ExtractValueFn("scientist")).withTypeFn(new Modulo2ValueFn("scientist")));
        this.pipeline.run();
        for (String str : ElasticSearchIOTestUtils.FAMOUS_SCIENTISTS) {
            String lowerCase = str.toLowerCase();
            for (int i = 0; i < EXPECTED_RETRIES; i++) {
                String str2 = "TYPE_" + (str.hashCode() % EXPECTED_RETRIES);
                Assert.assertEquals("Incorrect count for " + lowerCase + "/" + str2, this.numDocs / ElasticSearchIOTestUtils.NUM_SCIENTISTS, ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(this.restClient, lowerCase, str2));
            }
        }
    }

    void testWritePartialUpdate() throws Exception {
        if (!this.useAsITests) {
            ElasticSearchIOTestUtils.insertTestDocuments(this.connectionConfiguration, this.numDocs, this.restClient);
        }
        Assert.assertEquals(this.numDocs, ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(this.connectionConfiguration, this.restClient));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.numDocs; i++) {
            arrayList.add(String.format("{\"id\" : %s, \"group\" : %s}", Integer.valueOf(i), Integer.valueOf(i % EXPECTED_RETRIES)));
        }
        this.pipeline.apply(Create.of(arrayList)).apply(ElasticsearchIO.write().withConnectionConfiguration(this.connectionConfiguration).withIdFn(new ExtractValueFn("id")).withUsePartialUpdate(true));
        this.pipeline.run();
        Assert.assertEquals(this.numDocs, ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(this.connectionConfiguration, this.restClient));
        Assert.assertEquals(this.numDocs / ElasticSearchIOTestUtils.NUM_SCIENTISTS, ElasticSearchIOTestUtils.countByScientistName(this.connectionConfiguration, this.restClient, "Einstein"));
        Assert.assertEquals(this.numDocs / 2, ElasticSearchIOTestUtils.countByMatch(this.connectionConfiguration, this.restClient, "group", "0"));
        Assert.assertEquals(this.numDocs / 2, ElasticSearchIOTestUtils.countByMatch(this.connectionConfiguration, this.restClient, "group", "1"));
    }

    void testDefaultRetryPredicate(RestClient restClient) throws IOException {
        Assert.assertTrue(CUSTOM_RETRY_PREDICATE.test(restClient.performRequest("POST", "/_bulk", Collections.emptyMap(), new NStringEntity(BAD_REQUEST, ContentType.APPLICATION_JSON), new Header[0]).getEntity()));
        Assert.assertFalse(ElasticsearchIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE.test(restClient.performRequest("POST", "/_bulk", Collections.emptyMap(), new NStringEntity(OK_REQUEST, ContentType.APPLICATION_JSON), new Header[0]).getEntity()));
    }

    void testWriteRetry() throws Throwable {
        this.expectedException.expectCause(Is.isA(IOException.class));
        this.expectedException.expectMessage(String.format("Error writing to ES after %d attempt(s). No more attempts allowed", Integer.valueOf(EXPECTED_RETRIES)));
        this.pipeline.apply(Create.of(Arrays.asList(BAD_FORMATTED_DOC))).apply(ElasticsearchIO.write().withConnectionConfiguration(this.connectionConfiguration).withRetryConfiguration(ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS, Duration.millis(35000L)).withRetryPredicate(CUSTOM_RETRY_PREDICATE)));
        this.pipeline.run();
    }

    void testWriteRetryValidRequest() throws Exception {
        executeWriteTest(ElasticsearchIO.write().withConnectionConfiguration(this.connectionConfiguration).withRetryConfiguration(ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS, Duration.millis(35000L)).withRetryPredicate(CUSTOM_RETRY_PREDICATE)));
    }

    private void executeWriteTest(ElasticsearchIO.Write write) throws Exception {
        this.pipeline.apply(Create.of(ElasticSearchIOTestUtils.createDocuments(this.numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS))).apply(write);
        this.pipeline.run();
        Assert.assertEquals(this.numDocs, ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs(this.connectionConfiguration, this.restClient));
        Assert.assertEquals(this.numDocs / ElasticSearchIOTestUtils.NUM_SCIENTISTS, ElasticSearchIOTestUtils.countByScientistName(this.connectionConfiguration, this.restClient, "Einstein"));
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
