package org.apache.flink.streaming.connectors.elasticsearch.testutils;

import org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironment;
import org.apache.flink.util.InstantiationUtil;
import org.assertj.core.api.Assertions;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.common.unit.TimeValue;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.class */
public class ElasticsearchResource extends ExternalResource {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchResource.class);
    private EmbeddedElasticsearchNodeEnvironment embeddedNodeEnv;
    private final TemporaryFolder tempFolder = new TemporaryFolder();
    private final String clusterName;

    public ElasticsearchResource(String str) {
        this.clusterName = str;
    }

    protected void before() throws Throwable {
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    Starting embedded Elasticsearch node ");
        LOG.info("-------------------------------------------------------------------------");
        this.embeddedNodeEnv = (EmbeddedElasticsearchNodeEnvironment) InstantiationUtil.instantiate(Class.forName("org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl"));
        this.tempFolder.create();
        this.embeddedNodeEnv.start(this.tempFolder.newFolder(), this.clusterName);
        waitForCluster();
    }

    private void waitForCluster() {
        ClusterAdminClient cluster = this.embeddedNodeEnv.getClient().admin().cluster();
        ClusterHealthResponse clusterHealthResponse = (ClusterHealthResponse) cluster.health(cluster.prepareHealth(new String[]{"_all"}).setTimeout(TimeValue.timeValueSeconds(120L)).request()).actionGet(TimeValue.timeValueSeconds(120L));
        Assertions.assertThat(clusterHealthResponse.getNumberOfNodes()).isGreaterThanOrEqualTo(1);
        Assertions.assertThat(clusterHealthResponse.getNumberOfDataNodes()).isGreaterThanOrEqualTo(1);
    }

    protected void after() {
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    Shutting down embedded Elasticsearch node ");
        LOG.info("-------------------------------------------------------------------------");
        try {
            this.embeddedNodeEnv.close();
            this.tempFolder.delete();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public Client getClient() {
        return this.embeddedNodeEnv.getClient();
    }
}
