/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.elasticsearch.testutils;

import java.util.HashMap;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.junit.Assert;

public class SourceSinkDataTestKit {
    private static final int NUM_ELEMENTS = 20;
    private static final String DATA_PREFIX = "message #";
    private static final String DATA_FIELD_NAME = "data";
    private static final String TYPE_NAME = "flink-es-test-type";

    public static void verifyProducedSinkData(Client client, String index) {
        for (int i = 0; i < 20; ++i) {
            GetResponse response = (GetResponse)client.get(new GetRequest(index, TYPE_NAME, Integer.toString(i))).actionGet();
            Assert.assertEquals((Object)(DATA_PREFIX + i), response.getSource().get(DATA_FIELD_NAME));
        }
    }

    public static class TestElasticsearchSinkFunction
    implements ElasticsearchSinkFunction<Tuple2<Integer, String>> {
        private static final long serialVersionUID = 1L;
        private final String index;

        public TestElasticsearchSinkFunction(String index) {
            this.index = index;
        }

        public IndexRequest createIndexRequest(Tuple2<Integer, String> element) {
            HashMap<String, Object> json = new HashMap<String, Object>();
            json.put(SourceSinkDataTestKit.DATA_FIELD_NAME, element.f1);
            return new IndexRequest(this.index, SourceSinkDataTestKit.TYPE_NAME, ((Integer)element.f0).toString()).source(json);
        }

        public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) {
            indexer.add(new IndexRequest[]{this.createIndexRequest(element)});
        }
    }

    public static class TestDataSourceFunction
    implements SourceFunction<Tuple2<Integer, String>> {
        private static final long serialVersionUID = 1L;
        private volatile boolean running = true;

        public void run(SourceFunction.SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
            for (int i = 0; i < 20 && this.running; ++i) {
                ctx.collect((Object)Tuple2.of((Object)i, (Object)(SourceSinkDataTestKit.DATA_PREFIX + i)));
            }
        }

        public void cancel() {
            this.running = false;
        }
    }
}

