package com.datatorrent.contrib.elasticsearch;

import com.datatorrent.api.Context;
import com.datatorrent.api.Sink;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.index.query.FilterBuilders;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/contrib/elasticsearch/ElasticSearchOperatorTest.class */
public class ElasticSearchOperatorTest {
    protected static final String ID_FIELD = "id";
    protected static final String INDEX_NAME = "dt-test";
    protected static final String TYPE = "testMessages";
    private static final String VALUE = "value";
    private static final String WINDOW_ID = "Window ID";
    private static final String POST_DATE = "post_date";

    ElasticSearchConnectable createStore() {
        ElasticSearchConnectable elasticSearchConnectable = new ElasticSearchConnectable();
        elasticSearchConnectable.setHostName("localhost");
        elasticSearchConnectable.setPort(9300);
        return elasticSearchConnectable;
    }

    @Test
    public void testReadAfterWrite() {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            List<String> writeToES = writeToES();
            try {
                Thread.sleep(1000L);
                List<String> readFromES = readFromES(writeToES, currentTimeMillis);
                Collections.sort(writeToES);
                Collections.sort(readFromES);
                Assert.assertEquals("Data inconsistency in Elastic Search", writeToES, readFromES);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        } catch (NoNodeAvailableException e2) {
        }
    }

    private List<String> writeToES() {
        ElasticSearchMapOutputOperator<Map<String, Object>> elasticSearchMapOutputOperator = new ElasticSearchMapOutputOperator<Map<String, Object>>() { // from class: com.datatorrent.contrib.elasticsearch.ElasticSearchOperatorTest.1
            public void processTuple(Map<String, Object> map) {
                super.processTuple(map);
            }
        };
        ArrayList arrayList = new ArrayList();
        elasticSearchMapOutputOperator.setIdField(ID_FIELD);
        elasticSearchMapOutputOperator.setIndexName(INDEX_NAME);
        elasticSearchMapOutputOperator.setType(TYPE);
        elasticSearchMapOutputOperator.setStore(createStore());
        elasticSearchMapOutputOperator.setup((Context.OperatorContext) null);
        for (int i = 1; i <= 5; i++) {
            elasticSearchMapOutputOperator.beginWindow(i);
            for (int i2 = 0; i2 < 3; i2++) {
                HashMap hashMap = new HashMap();
                hashMap.put(VALUE, Integer.valueOf(i2));
                hashMap.put(WINDOW_ID, Integer.valueOf(i));
                long currentTimeMillis = System.currentTimeMillis();
                String uuid = UUID.randomUUID().toString();
                arrayList.add(uuid);
                hashMap.put(ID_FIELD, uuid);
                hashMap.put(POST_DATE, Long.valueOf(currentTimeMillis));
                elasticSearchMapOutputOperator.processTuple(hashMap);
            }
            elasticSearchMapOutputOperator.endWindow();
        }
        elasticSearchMapOutputOperator.teardown();
        return arrayList;
    }

    private List<String> readFromES(List<String> list, final long j) {
        ElasticSearchMapInputOperator<Map<String, Object>> elasticSearchMapInputOperator = new ElasticSearchMapInputOperator<Map<String, Object>>() { // from class: com.datatorrent.contrib.elasticsearch.ElasticSearchOperatorTest.2
            protected SearchRequestBuilder getSearchRequestBuilder() {
                return this.searchRequestBuilder.setPostFilter(FilterBuilders.rangeFilter(ElasticSearchOperatorTest.POST_DATE).from(j).to(System.currentTimeMillis())).setSize(15).setExplain(false);
            }
        };
        elasticSearchMapInputOperator.setIndexName(INDEX_NAME);
        elasticSearchMapInputOperator.setType(TYPE);
        elasticSearchMapInputOperator.setStore(createStore());
        final ArrayList arrayList = new ArrayList();
        elasticSearchMapInputOperator.outputPort.setSink(new Sink<Map<String, Object>>() { // from class: com.datatorrent.contrib.elasticsearch.ElasticSearchOperatorTest.3
            public void put(Map<String, Object> map) {
                arrayList.add(map.get(ElasticSearchOperatorTest.ID_FIELD).toString());
            }

            public int getCount(boolean z) {
                return 0;
            }
        });
        elasticSearchMapInputOperator.setup((Context.OperatorContext) null);
        for (int i = 1; i <= 1; i++) {
            elasticSearchMapInputOperator.beginWindow(i);
            elasticSearchMapInputOperator.emitTuples();
            elasticSearchMapInputOperator.endWindow();
        }
        elasticSearchMapInputOperator.teardown();
        return arrayList;
    }
}
