/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sinks;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.http.HttpHost;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer;
import org.apache.pulsar.tests.integration.io.sinks.SinkTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

public class ElasticSearchSinkTester
extends SinkTester<ElasticSearchContainer> {
    private static final Logger log = LoggerFactory.getLogger(ElasticSearchSinkTester.class);
    private RestHighLevelClient elasticClient;
    private boolean schemaEnable;
    private final Schema<KeyValue<SimplePojo, SimplePojo>> kvSchema;

    @Override
    public Schema<?> getInputTopicSchema() {
        if (this.schemaEnable) {
            return Schema.AUTO_CONSUME();
        }
        return Schema.STRING;
    }

    public ElasticSearchSinkTester(boolean schemaEnable) {
        super("ElasticSearch", SinkTester.SinkType.ELASTIC_SEARCH);
        this.sinkConfig.put("elasticSearchUrl", "http://ElasticSearch:9200");
        this.sinkConfig.put("indexName", "test-index");
        this.schemaEnable = schemaEnable;
        if (schemaEnable) {
            this.sinkConfig.put("schemaEnable", "true");
            this.kvSchema = Schema.KeyValue((Schema)Schema.JSON(SimplePojo.class), (Schema)Schema.AVRO(SimplePojo.class), (KeyValueEncodingType)KeyValueEncodingType.SEPARATED);
        } else {
            this.kvSchema = null;
        }
    }

    @Override
    protected ElasticSearchContainer createSinkService(PulsarCluster cluster) {
        return new ElasticSearchContainer(cluster.getClusterName());
    }

    @Override
    public void prepareSink() throws Exception {
        RestClientBuilder builder = RestClient.builder((HttpHost[])new HttpHost[]{new HttpHost("localhost", ((ElasticSearchContainer)this.serviceContainer).getMappedPort(9200).intValue(), "http")});
        this.elasticClient = new RestHighLevelClient(builder);
    }

    @Override
    public void stopServiceContainer(PulsarCluster cluster) {
        try {
            if (this.elasticClient != null) {
                this.elasticClient.close();
            }
        }
        catch (IOException e) {
            log.warn("Error closing elasticClient, ignoring", (Throwable)e);
        }
        finally {
            super.stopServiceContainer(cluster);
        }
    }

    @Override
    public void validateSinkResult(Map<String, String> kvs) {
        SearchRequest searchRequest = new SearchRequest(new String[]{"test-index"});
        Awaitility.await().untilAsserted(() -> {
            SearchResponse searchResult = this.elasticClient.search(searchRequest, RequestOptions.DEFAULT);
            Assert.assertTrue((searchResult.getHits().getTotalHits().value > 0L ? 1 : 0) != 0, (String)searchResult.toString());
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void produceMessage(int numMessages, PulsarClient client, String inputTopicName, LinkedHashMap<String, String> kvs) throws Exception {
        if (this.schemaEnable) {
            Producer producer = client.newProducer(this.kvSchema).topic(inputTopicName).create();
            try {
                for (int i = 0; i < numMessages; ++i) {
                    String key = "key-" + i;
                    kvs.put(key, key);
                    producer.newMessage().value((Object)new KeyValue((Object)new SimplePojo("f1_" + i, "f2_" + i), (Object)new SimplePojo("v1_" + i, "v2_" + i))).send();
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        Producer producer = client.newProducer(Schema.STRING).topic(inputTopicName).create();
        try {
            for (int i = 0; i < numMessages; ++i) {
                String key = "key-" + i;
                HashMap<String, String> valueMap = new HashMap<String, String>();
                valueMap.put("key" + i, "value" + i);
                String value = ObjectMapperFactory.getThreadLocal().writeValueAsString(valueMap);
                kvs.put(key, value);
                producer.newMessage().key(key).value((Object)value).send();
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    public static final class SimplePojo {
        private String field1;
        private String field2;

        public String getField1() {
            return this.field1;
        }

        public String getField2() {
            return this.field2;
        }

        public void setField1(String field1) {
            this.field1 = field1;
        }

        public void setField2(String field2) {
            this.field2 = field2;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof SimplePojo)) {
                return false;
            }
            SimplePojo other = (SimplePojo)o;
            String this$field1 = this.getField1();
            String other$field1 = other.getField1();
            if (this$field1 == null ? other$field1 != null : !this$field1.equals(other$field1)) {
                return false;
            }
            String this$field2 = this.getField2();
            String other$field2 = other.getField2();
            return !(this$field2 == null ? other$field2 != null : !this$field2.equals(other$field2));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $field1 = this.getField1();
            result = result * 59 + ($field1 == null ? 43 : $field1.hashCode());
            String $field2 = this.getField2();
            result = result * 59 + ($field2 == null ? 43 : $field2.hashCode());
            return result;
        }

        public String toString() {
            return "ElasticSearchSinkTester.SimplePojo(field1=" + this.getField1() + ", field2=" + this.getField2() + ")";
        }

        public SimplePojo(String field1, String field2) {
            this.field1 = field1;
            this.field2 = field2;
        }
    }
}

