package org.apache.beam.sdk.io.hadoop.format;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.LinkedMapWritable;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticIT.class */
public class HadoopFormatIOElasticIT implements Serializable {
    private static final String TRUE = "true";
    private static final String ELASTIC_INDEX_NAME = "test_data";
    private static HadoopFormatElasticIOTestOptions options;
    private static ElasticsearchContainer elasticsearch;

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private final MapElements<LinkedMapWritable, String> transformFunc = MapElements.via(new SimpleFunction<LinkedMapWritable, String>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOElasticIT.1
        public String apply(LinkedMapWritable linkedMapWritable) {
            return HadoopFormatIOElasticIT.this.convertMapWRowToString(linkedMapWritable);
        }
    });

    /* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticIT$HadoopFormatElasticIOTestOptions.class */
    public interface HadoopFormatElasticIOTestOptions extends HadoopFormatIOTestOptions {
        @Description("Whether to use automatic ElasticSearch container")
        @Default.Boolean(true)
        Boolean isWithESContainer();

        void setWithESContainer(Boolean bool);
    }

    @BeforeClass
    public static void setUp() throws IOException {
        PipelineOptionsFactory.register(HadoopFormatElasticIOTestOptions.class);
        options = TestPipeline.testingPipelineOptions().as(HadoopFormatElasticIOTestOptions.class);
        if (options.isWithESContainer().booleanValue()) {
            setElasticsearchContainer();
        }
    }

    @AfterClass
    public static void afterClass() {
        if (options.isWithESContainer().booleanValue()) {
            elasticsearch.stop();
        }
    }

    @Test
    public void testHifIOWithElastic() throws SecurityException {
        PCollection apply = this.pipeline.apply(HadoopFormatIO.read().withConfiguration(getConfiguration()));
        PAssert.thatSingleton(apply.apply(Count.globally())).isEqualTo(1000L);
        PAssert.that(apply.apply(Values.create()).apply(this.transformFunc).apply(Combine.globally(new HashingFn()).withoutDefaults())).containsInAnyOrder(new String[]{"42e254c8689050ed0a617ff5e80ea392"});
        this.pipeline.run().waitUntilFinish();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String convertMapWRowToString(LinkedMapWritable linkedMapWritable) {
        return addFieldValuesToRow(addFieldValuesToRow(addFieldValuesToRow(addFieldValuesToRow(addFieldValuesToRow(addFieldValuesToRow(addFieldValuesToRow(addFieldValuesToRow(addFieldValuesToRow(addFieldValuesToRow(addFieldValuesToRow(addFieldValuesToRow(addFieldValuesToRow(addFieldValuesToRow(addFieldValuesToRow("", linkedMapWritable, "User_Name"), linkedMapWritable, "Item_Code"), linkedMapWritable, "Txn_ID"), linkedMapWritable, "Item_ID"), linkedMapWritable, "last_updated"), linkedMapWritable, "Price"), linkedMapWritable, "Title"), linkedMapWritable, "Description"), linkedMapWritable, "Age"), linkedMapWritable, "Item_Name"), linkedMapWritable, "Item_Price"), linkedMapWritable, "Availability"), linkedMapWritable, "Batch_Num"), linkedMapWritable, "Last_Ordered"), linkedMapWritable, "City");
    }

    private String addFieldValuesToRow(String str, MapWritable mapWritable, String str2) {
        return str + mapWritable.get(new Text(str2)).toString() + "|";
    }

    @Test
    public void testHifIOWithElasticQuery() {
        Configuration configuration = getConfiguration();
        configuration.set("es.query", "{  \"query\": {  \"match\" : {    \"Title\" : {      \"query\" : \"Title9\"    }  }  }}");
        PCollection apply = this.pipeline.apply(HadoopFormatIO.read().withConfiguration(configuration));
        PAssert.thatSingleton(apply.apply(Count.globally())).isEqualTo(1L);
        PAssert.that(apply.apply(Values.create()).apply(this.transformFunc).apply(Combine.globally(new HashingFn()).withoutDefaults())).containsInAnyOrder(new String[]{"d7a7e4e42c2ca7b83ef7c1ad1ebce000"});
        this.pipeline.run().waitUntilFinish();
    }

    private static Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set("es.nodes", options.getElasticServerIp());
        configuration.set("es.port", options.getElasticServerPort().toString());
        configuration.set("es.nodes.wan.only", TRUE);
        configuration.set("es.net.http.auth.user", options.getElasticUserName());
        configuration.set("es.net.http.auth.pass", options.getElasticPassword());
        configuration.set("es.resource", ELASTIC_INDEX_NAME);
        configuration.set("es.index.auto.create", TRUE);
        configuration.setClass("mapreduce.job.inputformat.class", EsInputFormat.class, InputFormat.class);
        configuration.setClass("key.class", Text.class, Object.class);
        configuration.setClass("value.class", LinkedMapWritable.class, Object.class);
        configuration.set("es.input.max.docs.per.partition", "50000");
        configuration.set("es.scroll.size", "400");
        configuration.set("es.batch.size.bytes", "8mb");
        return configuration;
    }

    private static void setElasticsearchContainer() throws IOException {
        elasticsearch = new ElasticsearchContainer(DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch").withTag("7.9.2"));
        elasticsearch.start();
        options.setElasticUserName("");
        options.setElasticPassword("");
        options.setElasticServerIp(elasticsearch.getContainerIpAddress());
        options.setElasticServerPort(elasticsearch.getMappedPort(9200));
        prepareElasticIndex();
    }

    private static Map<String, String> createElasticRow(Integer num) {
        HashMap hashMap = new HashMap();
        hashMap.put("User_Name", "User_Name" + num);
        hashMap.put("Item_Code", "" + num);
        hashMap.put("Txn_ID", "" + num);
        hashMap.put("Item_ID", "" + num);
        hashMap.put("last_updated", "" + (num.intValue() * 1000));
        hashMap.put("Price", "" + num);
        hashMap.put("Title", "Title" + num);
        hashMap.put("Description", "Description" + num);
        hashMap.put("Age", "" + num);
        hashMap.put("Item_Name", "Item_Name" + num);
        hashMap.put("Item_Price", "" + num);
        hashMap.put("Availability", "" + (num.intValue() % 2 == 0));
        hashMap.put("Batch_Num", "" + num);
        hashMap.put("Last_Ordered", new DateTime(Instant.ofEpochSecond(num.intValue())).toString(DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.000-0000")));
        hashMap.put("City", "City" + num);
        return hashMap;
    }

    private static void prepareElasticIndex() throws IOException {
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost[]{new HttpHost(options.getElasticServerIp(), options.getElasticServerPort().intValue(), "http")}));
        for (int i = 0; i < 1000; i++) {
            restHighLevelClient.index(new IndexRequest(ELASTIC_INDEX_NAME).source(createElasticRow(Integer.valueOf(i))), RequestOptions.DEFAULT);
        }
    }
}
