/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.hadoop.format;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.LinkedMapWritable;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.Netty4Plugin;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class HadoopFormatIOElasticTest
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(HadoopFormatIOElasticTest.class);
    private static final String ELASTIC_IN_MEM_HOSTNAME = "127.0.0.1";
    private static int port;
    private static final String ELASTIC_INTERNAL_VERSION = "5.x";
    private static final String TRUE = "true";
    private static final String ELASTIC_INDEX_NAME = "beamdb";
    private static final String ELASTIC_TYPE_NAME = "scientists";
    private static final String ELASTIC_RESOURCE = "/beamdb/scientists";
    private static final int TEST_DATA_ROW_COUNT = 10;
    private static final String ELASTIC_TYPE_ID_PREFIX = "s";
    @ClassRule
    public static TemporaryFolder elasticTempFolder;
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private final MapElements<LinkedMapWritable, String> transformFunc = MapElements.via((SimpleFunction)new SimpleFunction<LinkedMapWritable, String>(){

        public String apply(LinkedMapWritable mapw) {
            return mapw.get((Object)new Text("id")) + "|" + mapw.get((Object)new Text("scientist"));
        }
    });

    @BeforeClass
    public static void startServer() throws NodeValidationException, IOException {
        port = NetworkTestHelper.getAvailableLocalPort();
        ElasticEmbeddedServer.startElasticEmbeddedServer();
    }

    @Test
    public void testHifIOWithElastic() {
        String expectedHashCode = "a62a85f5f081e3840baf1028d4d6c6bc";
        Configuration conf = this.getConfiguration();
        PCollection esData = (PCollection)this.pipeline.apply((PTransform)HadoopFormatIO.read().withConfiguration(conf));
        PCollection count = (PCollection)esData.apply(Count.globally());
        PAssert.thatSingleton((PCollection)count).isEqualTo((Object)10L);
        PCollection values = (PCollection)esData.apply((PTransform)Values.create());
        PCollection textValues = (PCollection)values.apply(this.transformFunc);
        PCollection consolidatedHashcode = (PCollection)textValues.apply((PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()).withoutDefaults());
        PAssert.that((PCollection)consolidatedHashcode).containsInAnyOrder((Object[])new String[]{expectedHashCode});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testHifIOWithElasticQuery() {
        long expectedRowCount = 1L;
        String expectedHashCode = "cfbf3e5c993d44e57535a114e25f782d";
        Configuration conf = this.getConfiguration();
        String fieldValue = "s2";
        String query = "{  \"query\": {  \"match\" : {    \"id\" : {      \"query\" : \"" + fieldValue + "\",      \"type\" : \"boolean\"    }  }  }}";
        conf.set("es.query", query);
        PCollection esData = (PCollection)this.pipeline.apply((PTransform)HadoopFormatIO.read().withConfiguration(conf));
        PCollection count = (PCollection)esData.apply(Count.globally());
        PAssert.thatSingleton((PCollection)count).isEqualTo((Object)expectedRowCount);
        PCollection values = (PCollection)esData.apply((PTransform)Values.create());
        PCollection textValues = (PCollection)values.apply(this.transformFunc);
        PCollection consolidatedHashcode = (PCollection)textValues.apply((PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()).withoutDefaults());
        PAssert.that((PCollection)consolidatedHashcode).containsInAnyOrder((Object[])new String[]{expectedHashCode});
        this.pipeline.run().waitUntilFinish();
    }

    private Configuration getConfiguration() {
        Configuration conf = new Configuration();
        conf.set("es.nodes", ELASTIC_IN_MEM_HOSTNAME);
        conf.set("es.port", String.format("%s", port));
        conf.set("es.resource", ELASTIC_RESOURCE);
        conf.set("es.internal.es.version", ELASTIC_INTERNAL_VERSION);
        conf.set("es.nodes.discovery", TRUE);
        conf.set("es.index.auto.create", TRUE);
        conf.setClass("mapreduce.job.inputformat.class", EsInputFormat.class, InputFormat.class);
        conf.setClass("key.class", Text.class, Object.class);
        conf.setClass("value.class", LinkedMapWritable.class, Object.class);
        return conf;
    }

    private static Map<String, String> createElasticRow(String id, String name) {
        HashMap<String, String> data = new HashMap<String, String>();
        data.put("id", id);
        data.put("scientist", name);
        return data;
    }

    @AfterClass
    public static void shutdownServer() throws IOException {
        ElasticEmbeddedServer.shutdown();
    }

    static {
        elasticTempFolder = new TemporaryFolder();
    }

    static class PluginNode
    extends Node
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private static final ImmutableList<Class<? extends Plugin>> PLUGINS = ImmutableList.of(Netty4Plugin.class);

        PluginNode(Settings settings) {
            super(InternalSettingsPreparer.prepareEnvironment((Settings)settings, null), PLUGINS);
        }
    }

    static class ElasticEmbeddedServer
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private static Node node;

        ElasticEmbeddedServer() {
        }

        static void startElasticEmbeddedServer() throws NodeValidationException {
            Settings settings = Settings.builder().put("node.data", HadoopFormatIOElasticTest.TRUE).put("network.host", HadoopFormatIOElasticTest.ELASTIC_IN_MEM_HOSTNAME).put("http.port", port).put("path.data", elasticTempFolder.getRoot().getPath()).put("path.home", elasticTempFolder.getRoot().getPath()).put("transport.type", "local").put("http.enabled", HadoopFormatIOElasticTest.TRUE).put("node.ingest", HadoopFormatIOElasticTest.TRUE).build();
            node = new PluginNode(settings);
            node.start();
            LOG.info("Elastic in memory server started.");
            ElasticEmbeddedServer.prepareElasticIndex();
            LOG.info("Prepared index beamdband populated data on elastic in memory server.");
        }

        private static void prepareElasticIndex() {
            CreateIndexRequest indexRequest = new CreateIndexRequest(HadoopFormatIOElasticTest.ELASTIC_INDEX_NAME);
            node.client().admin().indices().create(indexRequest).actionGet();
            for (int i = 0; i < 10; ++i) {
                node.client().prepareIndex(HadoopFormatIOElasticTest.ELASTIC_INDEX_NAME, HadoopFormatIOElasticTest.ELASTIC_TYPE_NAME, String.valueOf(i)).setSource(HadoopFormatIOElasticTest.createElasticRow(HadoopFormatIOElasticTest.ELASTIC_TYPE_ID_PREFIX + i, "Faraday" + i)).execute().actionGet();
            }
            node.client().admin().indices().prepareRefresh(new String[]{HadoopFormatIOElasticTest.ELASTIC_INDEX_NAME}).get();
        }

        static void shutdown() throws IOException {
            DeleteIndexRequest indexRequest = new DeleteIndexRequest(HadoopFormatIOElasticTest.ELASTIC_INDEX_NAME);
            node.client().admin().indices().delete(indexRequest).actionGet();
            LOG.info("Deleted index beamdb from elastic in memory server");
            node.close();
            LOG.info("Closed elastic in memory server node.");
            ElasticEmbeddedServer.deleteElasticDataDirectory();
        }

        private static void deleteElasticDataDirectory() {
            try {
                FileUtils.deleteDirectory((File)new File(elasticTempFolder.getRoot().getPath()));
            }
            catch (IOException e) {
                throw new RuntimeException("Could not delete elastic data directory: " + e.getMessage(), e);
            }
        }
    }
}

