/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sinks;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.google.common.collect.ImmutableMap;
import java.lang.invoke.CallSite;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.http.HttpHost;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.tests.integration.io.sinks.SinkTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testng.Assert;

public abstract class ElasticSearchSinkTester
extends SinkTester<ElasticsearchContainer> {
    private static final Logger log = LoggerFactory.getLogger(ElasticSearchSinkTester.class);
    private static final String NAME = "elastic-search";
    private ElasticsearchClient elasticClient;
    private boolean schemaEnable;
    private final Schema<KeyValue<SimplePojo, SimplePojo>> kvSchema;

    @Override
    public Schema<?> getInputTopicSchema() {
        if (this.schemaEnable) {
            return Schema.AUTO_CONSUME();
        }
        return Schema.STRING;
    }

    public ElasticSearchSinkTester(boolean schemaEnable) {
        super(NAME, SinkTester.SinkType.ELASTIC_SEARCH);
        this.sinkConfig.put("elasticSearchUrl", "http://elastic-search:9200");
        this.sinkConfig.put("indexName", "test-index");
        this.schemaEnable = schemaEnable;
        if (schemaEnable) {
            this.sinkConfig.put("schemaEnable", "true");
            this.kvSchema = Schema.KeyValue((Schema)Schema.JSON(SimplePojo.class), (Schema)Schema.AVRO(SimplePojo.class), (KeyValueEncodingType)KeyValueEncodingType.SEPARATED);
        } else {
            this.kvSchema = null;
        }
    }

    @Override
    protected final ElasticsearchContainer createSinkService(PulsarCluster cluster) {
        ElasticsearchContainer elasticContainer = this.createElasticContainer();
        this.configureElasticContainer(elasticContainer);
        return elasticContainer;
    }

    protected void configureElasticContainer(ElasticsearchContainer elasticContainer) {
        if (!this.isOpenSearch()) {
            elasticContainer.withEnv("ingest.geoip.downloader.enabled", "false");
        }
        elasticContainer.withEnv("cluster.routing.allocation.disk.threshold_enabled", "false");
        elasticContainer.withLogConsumer(o -> log.info("elastic> {}", (Object)o.getUtf8String()));
    }

    protected boolean isOpenSearch() {
        return false;
    }

    protected abstract ElasticsearchContainer createElasticContainer();

    @Override
    public void prepareSink() throws Exception {
        RestClientBuilder builder = RestClient.builder((HttpHost[])new HttpHost[]{new HttpHost("localhost", ((ElasticsearchContainer)this.serviceContainer).getMappedPort(9200).intValue(), "http")});
        RestClientTransport transport = new RestClientTransport(builder.build(), (JsonpMapper)new JacksonJsonpMapper());
        this.elasticClient = new ElasticsearchClient((ElasticsearchTransport)transport);
    }

    @Override
    public void validateSinkResult(Map<String, String> kvs) {
        Awaitility.await().untilAsserted(() -> {
            SearchResponse searchResult = this.elasticClient.search(new SearchRequest.Builder().index("test-index", new String[0]).q("*:*").build(), Map.class);
            Assert.assertTrue((searchResult.hits().total().value() > 0L ? 1 : 0) != 0, (String)searchResult.toString());
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void produceMessage(int numMessages, PulsarClient client, String inputTopicName, LinkedHashMap<String, String> kvs) throws Exception {
        if (this.schemaEnable) {
            Producer producer = client.newProducer(this.kvSchema).topic(inputTopicName).create();
            try {
                for (int i = 0; i < numMessages; ++i) {
                    String key = "key-" + i;
                    kvs.put(key, key);
                    SimplePojo keyPojo = new SimplePojo("f1_" + i, "f2_" + i, Arrays.asList(i, i + 1), new HashSet<Long>(Arrays.asList(i)), (Map<String, String>)ImmutableMap.of((Object)("map1_k_" + i), (Object)("map1_kv_" + i)));
                    SimplePojo valuePojo = new SimplePojo("f1_" + i, "f2_" + i, Arrays.asList(i, i + 1), new HashSet<Long>(Arrays.asList(i)), (Map<String, String>)ImmutableMap.of((Object)("map1_v_" + i), (Object)("map1_vv_" + i)));
                    producer.newMessage().value((Object)new KeyValue((Object)keyPojo, (Object)valuePojo)).send();
                }
            }
            finally {
                if (Collections.singletonList(producer).get(0) != null) {
                    producer.close();
                }
            }
        }
        Producer producer = client.newProducer(Schema.STRING).topic(inputTopicName).create();
        try {
            for (int i = 0; i < numMessages; ++i) {
                String key = "key-" + i;
                HashMap<CallSite, CallSite> valueMap = new HashMap<CallSite, CallSite>();
                valueMap.put((CallSite)((Object)("key" + i)), (CallSite)((Object)("value" + i)));
                String value = ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(valueMap);
                kvs.put(key, value);
                producer.newMessage().key(key).value((Object)value).send();
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Override
    public void close() throws Exception {
        if (this.elasticClient != null) {
            ((ElasticsearchTransport)this.elasticClient._transport()).close();
            this.elasticClient = null;
        }
    }

    public static final class SimplePojo {
        private String field1;
        private String field2;
        private List<Integer> list1;
        private Set<Long> set1;
        private Map<String, String> map1;

        public String getField1() {
            return this.field1;
        }

        public String getField2() {
            return this.field2;
        }

        public List<Integer> getList1() {
            return this.list1;
        }

        public Set<Long> getSet1() {
            return this.set1;
        }

        public Map<String, String> getMap1() {
            return this.map1;
        }

        public void setField1(String field1) {
            this.field1 = field1;
        }

        public void setField2(String field2) {
            this.field2 = field2;
        }

        public void setList1(List<Integer> list1) {
            this.list1 = list1;
        }

        public void setSet1(Set<Long> set1) {
            this.set1 = set1;
        }

        public void setMap1(Map<String, String> map1) {
            this.map1 = map1;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof SimplePojo)) {
                return false;
            }
            SimplePojo other = (SimplePojo)o;
            String this$field1 = this.getField1();
            String other$field1 = other.getField1();
            if (this$field1 == null ? other$field1 != null : !this$field1.equals(other$field1)) {
                return false;
            }
            String this$field2 = this.getField2();
            String other$field2 = other.getField2();
            if (this$field2 == null ? other$field2 != null : !this$field2.equals(other$field2)) {
                return false;
            }
            List<Integer> this$list1 = this.getList1();
            List<Integer> other$list1 = other.getList1();
            if (this$list1 == null ? other$list1 != null : !((Object)this$list1).equals(other$list1)) {
                return false;
            }
            Set<Long> this$set1 = this.getSet1();
            Set<Long> other$set1 = other.getSet1();
            if (this$set1 == null ? other$set1 != null : !((Object)this$set1).equals(other$set1)) {
                return false;
            }
            Map<String, String> this$map1 = this.getMap1();
            Map<String, String> other$map1 = other.getMap1();
            return !(this$map1 == null ? other$map1 != null : !((Object)this$map1).equals(other$map1));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $field1 = this.getField1();
            result = result * 59 + ($field1 == null ? 43 : $field1.hashCode());
            String $field2 = this.getField2();
            result = result * 59 + ($field2 == null ? 43 : $field2.hashCode());
            List<Integer> $list1 = this.getList1();
            result = result * 59 + ($list1 == null ? 43 : ((Object)$list1).hashCode());
            Set<Long> $set1 = this.getSet1();
            result = result * 59 + ($set1 == null ? 43 : ((Object)$set1).hashCode());
            Map<String, String> $map1 = this.getMap1();
            result = result * 59 + ($map1 == null ? 43 : ((Object)$map1).hashCode());
            return result;
        }

        public String toString() {
            return "ElasticSearchSinkTester.SimplePojo(field1=" + this.getField1() + ", field2=" + this.getField2() + ", list1=" + this.getList1() + ", set1=" + this.getSet1() + ", map1=" + this.getMap1() + ")";
        }

        public SimplePojo(String field1, String field2, List<Integer> list1, Set<Long> set1, Map<String, String> map1) {
            this.field1 = field1;
            this.field2 = field2;
            this.list1 = list1;
            this.set1 = set1;
            this.map1 = map1;
        }
    }
}

