package org.apache.beam.sdk.io.hadoop.format;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.LinkedMapWritable;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticTest.class */
public class HadoopFormatIOElasticTest implements Serializable {
    private static final long serialVersionUID = 1;
    private static final String TRUE = "true";
    private static final String ELASTIC_INDEX_NAME = "beamdb";
    private static final int TEST_DATA_ROW_COUNT = 10;
    private static final String ELASTIC_TYPE_ID_PREFIX = "s";

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private final MapElements<LinkedMapWritable, String> transformFunc = MapElements.via(new SimpleFunction<LinkedMapWritable, String>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOElasticTest.1
        public String apply(LinkedMapWritable linkedMapWritable) {
            return linkedMapWritable.get(new Text("id")) + "|" + linkedMapWritable.get(new Text("scientist"));
        }
    });

    @ClassRule
    public static TemporaryFolder elasticTempFolder = new TemporaryFolder();
    private static final String ES_VERSION = "7.9.2";

    @ClassRule
    public static ElasticsearchContainer elasticsearch = new ElasticsearchContainer(DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch").withTag(ES_VERSION));

    @BeforeClass
    public static void startServer() throws IOException {
        prepareElasticIndex();
    }

    @Test
    public void testHifIOWithElastic() {
        PCollection apply = this.pipeline.apply(HadoopFormatIO.read().withConfiguration(getConfiguration()));
        PAssert.thatSingleton(apply.apply(Count.globally())).isEqualTo(10L);
        PAssert.that(apply.apply(Values.create()).apply(this.transformFunc).apply(Combine.globally(new HashingFn()).withoutDefaults())).containsInAnyOrder(new String[]{"a62a85f5f081e3840baf1028d4d6c6bc"});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testHifIOWithElasticQuery() {
        Configuration configuration = getConfiguration();
        configuration.set("es.query", String.format("{\"query\": { \"match\": { \"id\": { \"query\": \"%s\" }}}}", "s2"));
        PCollection apply = this.pipeline.apply(HadoopFormatIO.read().withConfiguration(configuration));
        PAssert.thatSingleton(apply.apply(Count.globally())).isEqualTo(Long.valueOf(serialVersionUID));
        PAssert.that(apply.apply(Values.create()).apply(this.transformFunc).apply(Combine.globally(new HashingFn()).withoutDefaults())).containsInAnyOrder(new String[]{"cfbf3e5c993d44e57535a114e25f782d"});
        this.pipeline.run().waitUntilFinish();
    }

    private Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set("es.port", elasticsearch.getMappedPort(9200).toString());
        configuration.set("es.resource", ELASTIC_INDEX_NAME);
        configuration.set("es.nodes.wan.only", TRUE);
        configuration.set("es.index.auto.create", TRUE);
        configuration.setClass("mapreduce.job.inputformat.class", EsInputFormat.class, InputFormat.class);
        configuration.setClass("key.class", Text.class, Object.class);
        configuration.setClass("value.class", LinkedMapWritable.class, Object.class);
        return configuration;
    }

    private static Map<String, String> createElasticRow(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put("id", str);
        hashMap.put("scientist", str2);
        return hashMap;
    }

    private static void prepareElasticIndex() throws IOException {
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost[]{new HttpHost(elasticsearch.getContainerIpAddress(), elasticsearch.getMappedPort(9200).intValue(), "http")}));
        for (int i = 0; i < TEST_DATA_ROW_COUNT; i++) {
            restHighLevelClient.index(new IndexRequest(ELASTIC_INDEX_NAME).source(createElasticRow(ELASTIC_TYPE_ID_PREFIX + i, "Faraday" + i)), RequestOptions.DEFAULT);
        }
    }
}
