package org.apache.flink.connector.elasticsearch.table;

import java.io.IOException;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.elasticsearch.ElasticsearchUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.http.HttpHost;
import org.assertj.core.api.Assertions;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHits;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkBaseITCase.class */
abstract class ElasticsearchDynamicSinkBaseITCase {
    ElasticsearchDynamicSinkBaseITCase() {
    }

    abstract String getElasticsearchHttpHostAddress();

    abstract ElasticsearchDynamicSinkFactoryBase getDynamicSinkFactory();

    abstract Map<String, Object> makeGetRequest(RestHighLevelClient restHighLevelClient, String str, String str2) throws IOException;

    abstract SearchHits makeSearchRequest(RestHighLevelClient restHighLevelClient, String str) throws IOException;

    abstract long getTotalSearchHits(SearchHits searchHits);

    abstract TestContext getPrefilledTestContext(String str);

    abstract String getConnectorSql(String str);

    private RestHighLevelClient getClient() {
        return new RestHighLevelClient(RestClient.builder(new HttpHost[]{HttpHost.create(getElasticsearchHttpHostAddress())}));
    }

    @Test
    public void testWritingDocuments() throws Exception {
        ResolvedSchema resolvedSchema = new ResolvedSchema(Arrays.asList(Column.physical("a", DataTypes.BIGINT().notNull()), Column.physical("b", DataTypes.TIME()), Column.physical("c", DataTypes.STRING().notNull()), Column.physical("d", DataTypes.FLOAT()), Column.physical("e", DataTypes.TINYINT().notNull()), Column.physical("f", DataTypes.DATE()), Column.physical("g", DataTypes.TIMESTAMP().notNull())), Collections.emptyList(), UniqueConstraint.primaryKey("name", Arrays.asList("a", "g")));
        RowData of = GenericRowData.of(new Object[]{1L, 12345, StringData.fromString("ABCDE"), Float.valueOf(12.12f), (byte) 2, 12345, TimestampData.fromLocalDateTime(LocalDateTime.parse("2012-12-12T12:12:12"))});
        Sink createSink = getDynamicSinkFactory().createDynamicTableSink(getPrefilledTestContext("writing-documents").withSchema(resolvedSchema).build()).getSinkRuntimeProvider(new ElasticsearchUtil.MockContext()).createSink();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(4);
        of.setRowKind(RowKind.UPDATE_AFTER);
        executionEnvironment.fromElements(new RowData[]{of}).sinkTo(createSink);
        executionEnvironment.execute();
        Map<String, Object> makeGetRequest = makeGetRequest(getClient(), "writing-documents", "1_2012-12-12T12:12:12");
        HashMap hashMap = new HashMap();
        hashMap.put("a", 1);
        hashMap.put("b", "00:00:12");
        hashMap.put("c", "ABCDE");
        hashMap.put("d", Double.valueOf(12.12d));
        hashMap.put("e", 2);
        hashMap.put("f", "2003-10-20");
        hashMap.put("g", "2012-12-12 12:12:12");
        Assertions.assertThat(makeGetRequest).isEqualTo(hashMap);
    }

    @Test
    public void testWritingDocumentsFromTableApi() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        create.executeSql("CREATE TABLE esTable (a BIGINT NOT NULL,\nb TIME,\nc STRING NOT NULL,\nd FLOAT,\ne TINYINT NOT NULL,\nf DATE,\ng TIMESTAMP NOT NULL,\nh as a + 2,\nPRIMARY KEY (a, g) NOT ENFORCED\n)\nWITH (\n" + getConnectorSql("table-api") + ")");
        create.fromValues(new Expression[]{Expressions.row(1L, new Object[]{LocalTime.ofNanoOfDay(12345000000L), "ABCDE", Float.valueOf(12.12f), (byte) 2, LocalDate.ofEpochDay(12345L), LocalDateTime.parse("2012-12-12T12:12:12")})}).executeInsert("esTable").await();
        Map<String, Object> makeGetRequest = makeGetRequest(getClient(), "table-api", "1_2012-12-12T12:12:12");
        HashMap hashMap = new HashMap();
        hashMap.put("a", 1);
        hashMap.put("b", "00:00:12");
        hashMap.put("c", "ABCDE");
        hashMap.put("d", Double.valueOf(12.12d));
        hashMap.put("e", 2);
        hashMap.put("f", "2003-10-20");
        hashMap.put("g", "2012-12-12 12:12:12");
        Assertions.assertThat(makeGetRequest).isEqualTo(hashMap);
    }

    @Test
    public void testWritingDocumentsNoPrimaryKey() throws Exception {
        SearchHits makeSearchRequest;
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        create.executeSql("CREATE TABLE esTable (a BIGINT NOT NULL,\nb TIME,\nc STRING NOT NULL,\nd FLOAT,\ne TINYINT NOT NULL,\nf DATE,\ng TIMESTAMP NOT NULL\n)\nWITH (\n" + getConnectorSql("no-primary-key") + ")");
        create.fromValues(new Expression[]{Expressions.row(1L, new Object[]{LocalTime.ofNanoOfDay(12345000000L), "ABCDE", Float.valueOf(12.12f), (byte) 2, LocalDate.ofEpochDay(12345L), LocalDateTime.parse("2012-12-12T12:12:12")}), Expressions.row(2L, new Object[]{LocalTime.ofNanoOfDay(12345000000L), "FGHIJK", Float.valueOf(13.13f), (byte) 4, LocalDate.ofEpochDay(12345L), LocalDateTime.parse("2013-12-12T13:13:13")})}).executeInsert("esTable").await();
        RestHighLevelClient client = getClient();
        Deadline fromNow = Deadline.fromNow(Duration.ofSeconds(30L));
        do {
            makeSearchRequest = makeSearchRequest(client, "no-primary-key");
            if (getTotalSearchHits(makeSearchRequest) < 2) {
                Thread.sleep(200L);
            }
            if (getTotalSearchHits(makeSearchRequest) >= 2) {
                break;
            }
        } while (fromNow.hasTimeLeft());
        if (getTotalSearchHits(makeSearchRequest) < 2) {
            throw new AssertionError("Could not retrieve results from Elasticsearch.");
        }
        HashSet hashSet = new HashSet();
        hashSet.add(makeSearchRequest.getAt(0).getSourceAsMap());
        hashSet.add(makeSearchRequest.getAt(1).getSourceAsMap());
        HashMap hashMap = new HashMap();
        hashMap.put("a", 1);
        hashMap.put("b", "00:00:12");
        hashMap.put("c", "ABCDE");
        hashMap.put("d", Double.valueOf(12.12d));
        hashMap.put("e", 2);
        hashMap.put("f", "2003-10-20");
        hashMap.put("g", "2012-12-12 12:12:12");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("a", 2);
        hashMap2.put("b", "00:00:12");
        hashMap2.put("c", "FGHIJK");
        hashMap2.put("d", Double.valueOf(13.13d));
        hashMap2.put("e", 4);
        hashMap2.put("f", "2003-10-20");
        hashMap2.put("g", "2013-12-12 13:13:13");
        HashSet hashSet2 = new HashSet();
        hashSet2.add(hashMap);
        hashSet2.add(hashMap2);
        Assertions.assertThat(hashSet).isEqualTo(hashSet2);
    }

    @Test
    public void testWritingDocumentsWithDynamicIndex() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        create.executeSql("CREATE TABLE esTable (a BIGINT NOT NULL,\nb TIMESTAMP NOT NULL,\nPRIMARY KEY (a) NOT ENFORCED\n)\nWITH (\n" + getConnectorSql("dynamic-index-{b|yyyy-MM-dd}") + ")");
        create.fromValues(new Expression[]{Expressions.row(1L, new Object[]{LocalDateTime.parse("2012-12-12T12:12:12")})}).executeInsert("esTable").await();
        Map<String, Object> makeGetRequest = makeGetRequest(getClient(), "dynamic-index-2012-12-12", "1");
        HashMap hashMap = new HashMap();
        hashMap.put("a", 1);
        hashMap.put("b", "2012-12-12 12:12:12");
        Assertions.assertThat(makeGetRequest).isEqualTo(hashMap);
    }

    @Test
    public void testWritingDocumentsWithDynamicIndexFromSystemTime() throws Exception {
        Map<String, Object> makeGetRequest;
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        DateTimeFormatter ofPattern = DateTimeFormatter.ofPattern("yyyy-MM-dd");
        create.getConfig().set(TableConfigOptions.LOCAL_TIME_ZONE, "Asia/Shanghai");
        String str = "dynamic-index-" + ofPattern.format(LocalDateTime.now(ZoneId.of("Asia/Shanghai"))) + "_index";
        create.executeSql("CREATE TABLE esTable (a BIGINT NOT NULL,\nb TIMESTAMP NOT NULL,\nPRIMARY KEY (a) NOT ENFORCED\n)\nWITH (\n" + getConnectorSql("dynamic-index-{now()|yyyy-MM-dd}_index") + ")");
        String str2 = "dynamic-index-" + ofPattern.format(LocalDateTime.now(ZoneId.of("Asia/Shanghai"))) + "_index";
        create.fromValues(new Expression[]{Expressions.row(1L, new Object[]{LocalDateTime.parse("2012-12-12T12:12:12")})}).executeInsert("esTable").await();
        RestHighLevelClient client = getClient();
        try {
            makeGetRequest = makeGetRequest(client, str, "1");
        } catch (ElasticsearchStatusException e) {
            if (e.status() != RestStatus.NOT_FOUND) {
                throw e;
            }
            makeGetRequest = makeGetRequest(client, str2, "1");
        }
        HashMap hashMap = new HashMap();
        hashMap.put("a", 1);
        hashMap.put("b", "2012-12-12 12:12:12");
        Assertions.assertThat(makeGetRequest).isEqualTo(hashMap);
    }
}
