/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.hadoop.format;

import java.io.Serializable;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOTestOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.LinkedMapWritable;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class HadoopFormatIOElasticIT
implements Serializable {
    private static final String ELASTIC_INTERNAL_VERSION = "5.x";
    private static final String TRUE = "true";
    private static final String ELASTIC_INDEX_NAME = "test_data";
    private static final String ELASTIC_TYPE_NAME = "test_type";
    private static final String ELASTIC_RESOURCE = "/test_data/test_type";
    private static HadoopFormatIOTestOptions options;
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private final MapElements<LinkedMapWritable, String> transformFunc = MapElements.via((SimpleFunction)new SimpleFunction<LinkedMapWritable, String>(){

        public String apply(LinkedMapWritable mapw) {
            String rowValue = "";
            rowValue = HadoopFormatIOElasticIT.this.convertMapWRowToString(mapw);
            return rowValue;
        }
    });

    @BeforeClass
    public static void setUp() {
        PipelineOptionsFactory.register(HadoopFormatIOTestOptions.class);
        options = (HadoopFormatIOTestOptions)TestPipeline.testingPipelineOptions().as(HadoopFormatIOTestOptions.class);
    }

    @Test
    public void testHifIOWithElastic() throws SecurityException {
        long expectedRowCount = 1000L;
        String expectedHashCode = "42e254c8689050ed0a617ff5e80ea392";
        Configuration conf = HadoopFormatIOElasticIT.getConfiguration(options);
        PCollection esData = (PCollection)this.pipeline.apply((PTransform)HadoopFormatIO.read().withConfiguration(conf));
        PCollection count = (PCollection)esData.apply(Count.globally());
        PAssert.thatSingleton((PCollection)count).isEqualTo((Object)1000L);
        PCollection values = (PCollection)esData.apply((PTransform)Values.create());
        PCollection textValues = (PCollection)values.apply(this.transformFunc);
        PCollection consolidatedHashcode = (PCollection)textValues.apply((PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()).withoutDefaults());
        PAssert.that((PCollection)consolidatedHashcode).containsInAnyOrder((Object[])new String[]{expectedHashCode});
        this.pipeline.run().waitUntilFinish();
    }

    private String convertMapWRowToString(LinkedMapWritable mapw) {
        String rowValue = "";
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "User_Name");
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "Item_Code");
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "Txn_ID");
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "Item_ID");
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "last_updated");
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "Price");
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "Title");
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "Description");
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "Age");
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "Item_Name");
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "Item_Price");
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "Availability");
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "Batch_Num");
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "Last_Ordered");
        rowValue = this.addFieldValuesToRow(rowValue, (MapWritable)mapw, "City");
        return rowValue;
    }

    private String addFieldValuesToRow(String row, MapWritable mapw, String columnName) {
        Writable valueObj = mapw.get((Object)new Text(columnName));
        row = row + valueObj.toString() + "|";
        return row;
    }

    @Test
    public void testHifIOWithElasticQuery() {
        String expectedHashCode = "d7a7e4e42c2ca7b83ef7c1ad1ebce000";
        Long expectedRecordsCount = 1L;
        Configuration conf = HadoopFormatIOElasticIT.getConfiguration(options);
        String query = "{  \"query\": {  \"match\" : {    \"Title\" : {      \"query\" : \"Title9\",      \"type\" : \"boolean\"    }  }  }}";
        conf.set("es.query", query);
        PCollection esData = (PCollection)this.pipeline.apply((PTransform)HadoopFormatIO.read().withConfiguration(conf));
        PCollection count = (PCollection)esData.apply(Count.globally());
        PAssert.thatSingleton((PCollection)count).isEqualTo((Object)expectedRecordsCount);
        PCollection values = (PCollection)esData.apply((PTransform)Values.create());
        PCollection textValues = (PCollection)values.apply(this.transformFunc);
        PCollection consolidatedHashcode = (PCollection)textValues.apply((PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()).withoutDefaults());
        PAssert.that((PCollection)consolidatedHashcode).containsInAnyOrder((Object[])new String[]{expectedHashCode});
        this.pipeline.run().waitUntilFinish();
    }

    private static Configuration getConfiguration(HadoopFormatIOTestOptions options) {
        Configuration conf = new Configuration();
        conf.set("es.nodes", options.getElasticServerIp());
        conf.set("es.port", options.getElasticServerPort().toString());
        conf.set("es.nodes.wan.only", TRUE);
        conf.set("es.net.http.auth.user", options.getElasticUserName());
        conf.set("es.net.http.auth.pass", options.getElasticPassword());
        conf.set("es.resource", ELASTIC_RESOURCE);
        conf.set("es.internal.es.version", ELASTIC_INTERNAL_VERSION);
        conf.set("es.index.auto.create", TRUE);
        conf.setClass("mapreduce.job.inputformat.class", EsInputFormat.class, InputFormat.class);
        conf.setClass("key.class", Text.class, Object.class);
        conf.setClass("value.class", LinkedMapWritable.class, Object.class);
        conf.set("es.input.max.docs.per.partition", "50000");
        conf.set("es.scroll.size", "400");
        conf.set("es.batch.size.bytes", "8mb");
        return conf;
    }
}

