package org.apache.beam.sdk.io.hadoop.format;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.LinkedMapWritable;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.Netty4Plugin;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticTest.class */
public class HadoopFormatIOElasticTest implements Serializable {
    private static final long serialVersionUID = 1;
    private static final String ELASTIC_IN_MEM_HOSTNAME = "127.0.0.1";
    private static int port;
    private static final String ELASTIC_INTERNAL_VERSION = "5.x";
    private static final String TRUE = "true";
    private static final String ELASTIC_INDEX_NAME = "beamdb";
    private static final String ELASTIC_TYPE_NAME = "scientists";
    private static final String ELASTIC_RESOURCE = "/beamdb/scientists";
    private static final int TEST_DATA_ROW_COUNT = 10;
    private static final String ELASTIC_TYPE_ID_PREFIX = "s";

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private final MapElements<LinkedMapWritable, String> transformFunc = MapElements.via(new SimpleFunction<LinkedMapWritable, String>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOElasticTest.1
        public String apply(LinkedMapWritable linkedMapWritable) {
            return linkedMapWritable.get(new Text("id")) + "|" + linkedMapWritable.get(new Text("scientist"));
        }
    });
    private static final Logger LOG = LoggerFactory.getLogger(HadoopFormatIOElasticTest.class);

    @ClassRule
    public static TemporaryFolder elasticTempFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticTest$ElasticEmbeddedServer.class */
    static class ElasticEmbeddedServer implements Serializable {
        private static final long serialVersionUID = 1;
        private static Node node;

        ElasticEmbeddedServer() {
        }

        static void startElasticEmbeddedServer() throws NodeValidationException {
            node = new PluginNode(Settings.builder().put("node.data", HadoopFormatIOElasticTest.TRUE).put("network.host", HadoopFormatIOElasticTest.ELASTIC_IN_MEM_HOSTNAME).put("http.port", HadoopFormatIOElasticTest.port).put("path.data", HadoopFormatIOElasticTest.elasticTempFolder.getRoot().getPath()).put("path.home", HadoopFormatIOElasticTest.elasticTempFolder.getRoot().getPath()).put("transport.type", "local").put("http.enabled", HadoopFormatIOElasticTest.TRUE).put("node.ingest", HadoopFormatIOElasticTest.TRUE).build());
            node.start();
            HadoopFormatIOElasticTest.LOG.info("Elastic in memory server started.");
            prepareElasticIndex();
            HadoopFormatIOElasticTest.LOG.info("Prepared index beamdband populated data on elastic in memory server.");
        }

        private static void prepareElasticIndex() {
            node.client().admin().indices().create(new CreateIndexRequest(HadoopFormatIOElasticTest.ELASTIC_INDEX_NAME)).actionGet();
            for (int i = 0; i < HadoopFormatIOElasticTest.TEST_DATA_ROW_COUNT; i++) {
                node.client().prepareIndex(HadoopFormatIOElasticTest.ELASTIC_INDEX_NAME, HadoopFormatIOElasticTest.ELASTIC_TYPE_NAME, String.valueOf(i)).setSource(HadoopFormatIOElasticTest.createElasticRow(HadoopFormatIOElasticTest.ELASTIC_TYPE_ID_PREFIX + i, "Faraday" + i)).execute().actionGet();
            }
            node.client().admin().indices().prepareRefresh(new String[]{HadoopFormatIOElasticTest.ELASTIC_INDEX_NAME}).get();
        }

        static void shutdown() throws IOException {
            node.client().admin().indices().delete(new DeleteIndexRequest(HadoopFormatIOElasticTest.ELASTIC_INDEX_NAME)).actionGet();
            HadoopFormatIOElasticTest.LOG.info("Deleted index beamdb from elastic in memory server");
            node.close();
            HadoopFormatIOElasticTest.LOG.info("Closed elastic in memory server node.");
            deleteElasticDataDirectory();
        }

        private static void deleteElasticDataDirectory() {
            try {
                FileUtils.deleteDirectory(new File(HadoopFormatIOElasticTest.elasticTempFolder.getRoot().getPath()));
            } catch (IOException e) {
                throw new RuntimeException("Could not delete elastic data directory: " + e.getMessage(), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticTest$PluginNode.class */
    public static class PluginNode extends Node implements Serializable {
        private static final long serialVersionUID = 1;
        private static final ImmutableList<Class<? extends Plugin>> PLUGINS = ImmutableList.of(Netty4Plugin.class);

        PluginNode(Settings settings) {
            super(InternalSettingsPreparer.prepareEnvironment(settings, (Terminal) null), PLUGINS);
        }
    }

    @BeforeClass
    public static void startServer() throws NodeValidationException, IOException {
        port = NetworkTestHelper.getAvailableLocalPort();
        ElasticEmbeddedServer.startElasticEmbeddedServer();
    }

    @Test
    public void testHifIOWithElastic() {
        PCollection apply = this.pipeline.apply(HadoopFormatIO.read().withConfiguration(getConfiguration()));
        PAssert.thatSingleton(apply.apply(Count.globally())).isEqualTo(10L);
        PAssert.that(apply.apply(Values.create()).apply(this.transformFunc).apply(Combine.globally(new HashingFn()).withoutDefaults())).containsInAnyOrder(new String[]{"a62a85f5f081e3840baf1028d4d6c6bc"});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testHifIOWithElasticQuery() {
        Configuration configuration = getConfiguration();
        configuration.set("es.query", "{  \"query\": {  \"match\" : {    \"id\" : {      \"query\" : \"s2\",      \"type\" : \"boolean\"    }  }  }}");
        PCollection apply = this.pipeline.apply(HadoopFormatIO.read().withConfiguration(configuration));
        PAssert.thatSingleton(apply.apply(Count.globally())).isEqualTo(Long.valueOf(serialVersionUID));
        PAssert.that(apply.apply(Values.create()).apply(this.transformFunc).apply(Combine.globally(new HashingFn()).withoutDefaults())).containsInAnyOrder(new String[]{"cfbf3e5c993d44e57535a114e25f782d"});
        this.pipeline.run().waitUntilFinish();
    }

    private Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set("es.nodes", ELASTIC_IN_MEM_HOSTNAME);
        configuration.set("es.port", String.format("%s", Integer.valueOf(port)));
        configuration.set("es.resource", ELASTIC_RESOURCE);
        configuration.set("es.internal.es.version", ELASTIC_INTERNAL_VERSION);
        configuration.set("es.nodes.discovery", TRUE);
        configuration.set("es.index.auto.create", TRUE);
        configuration.setClass("mapreduce.job.inputformat.class", EsInputFormat.class, InputFormat.class);
        configuration.setClass("key.class", Text.class, Object.class);
        configuration.setClass("value.class", LinkedMapWritable.class, Object.class);
        return configuration;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<String, String> createElasticRow(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put("id", str);
        hashMap.put("scientist", str2);
        return hashMap;
    }

    @AfterClass
    public static void shutdownServer() throws IOException {
        ElasticEmbeddedServer.shutdown();
    }
}
