/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.elasticsearch.table;

import java.io.IOException;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.elasticsearch.ElasticsearchUtil;
import org.apache.flink.connector.elasticsearch.table.ElasticsearchDynamicSinkFactoryBase;
import org.apache.flink.connector.elasticsearch.table.TestContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHits;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={TestLoggerExtension.class})
abstract class ElasticsearchDynamicSinkBaseITCase {
    ElasticsearchDynamicSinkBaseITCase() {
    }

    abstract String getElasticsearchHttpHostAddress();

    abstract ElasticsearchDynamicSinkFactoryBase getDynamicSinkFactory();

    abstract Map<String, Object> makeGetRequest(RestHighLevelClient var1, String var2, String var3) throws IOException;

    abstract SearchHits makeSearchRequest(RestHighLevelClient var1, String var2) throws IOException;

    abstract long getTotalSearchHits(SearchHits var1);

    abstract TestContext getPrefilledTestContext(String var1);

    abstract String getConnectorSql(String var1);

    private RestHighLevelClient getClient() {
        return new RestHighLevelClient(RestClient.builder((HttpHost[])new HttpHost[]{HttpHost.create((String)this.getElasticsearchHttpHostAddress())}));
    }

    @Test
    public void testWritingDocuments() throws Exception {
        ResolvedSchema schema = new ResolvedSchema(Arrays.asList(Column.physical((String)"a", (DataType)((DataType)DataTypes.BIGINT().notNull())), Column.physical((String)"b", (DataType)DataTypes.TIME()), Column.physical((String)"c", (DataType)((DataType)DataTypes.STRING().notNull())), Column.physical((String)"d", (DataType)DataTypes.FLOAT()), Column.physical((String)"e", (DataType)((DataType)DataTypes.TINYINT().notNull())), Column.physical((String)"f", (DataType)DataTypes.DATE()), Column.physical((String)"g", (DataType)((DataType)DataTypes.TIMESTAMP().notNull()))), Collections.emptyList(), UniqueConstraint.primaryKey((String)"name", Arrays.asList("a", "g")));
        GenericRowData rowData = GenericRowData.of((Object[])new Object[]{1L, 12345, StringData.fromString((String)"ABCDE"), Float.valueOf(12.12f), (byte)2, 12345, TimestampData.fromLocalDateTime((LocalDateTime)LocalDateTime.parse("2012-12-12T12:12:12"))});
        String index = "writing-documents";
        ElasticsearchDynamicSinkFactoryBase sinkFactory = this.getDynamicSinkFactory();
        DynamicTableSink.SinkRuntimeProvider runtimeProvider = sinkFactory.createDynamicTableSink(this.getPrefilledTestContext(index).withSchema(schema).build()).getSinkRuntimeProvider((DynamicTableSink.Context)new ElasticsearchUtil.MockContext());
        SinkV2Provider sinkProvider = (SinkV2Provider)runtimeProvider;
        Sink sink = sinkProvider.createSink();
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(4);
        rowData.setRowKind(RowKind.UPDATE_AFTER);
        environment.fromElements((Object[])new RowData[]{rowData}).sinkTo(sink);
        environment.execute();
        RestHighLevelClient client = this.getClient();
        Map<String, Object> response = this.makeGetRequest(client, index, "1_2012-12-12T12:12:12");
        HashMap<String, Object> expectedMap = new HashMap<String, Object>();
        expectedMap.put("a", 1);
        expectedMap.put("b", "00:00:12");
        expectedMap.put("c", "ABCDE");
        expectedMap.put("d", 12.12);
        expectedMap.put("e", 2);
        expectedMap.put("f", "2003-10-20");
        expectedMap.put("g", "2012-12-12 12:12:12");
        Assertions.assertEquals(response, expectedMap);
    }

    @Test
    public void testWritingDocumentsFromTableApi() throws Exception {
        TableEnvironment tableEnvironment = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        String index = "table-api";
        tableEnvironment.executeSql("CREATE TABLE esTable (a BIGINT NOT NULL,\nb TIME,\nc STRING NOT NULL,\nd FLOAT,\ne TINYINT NOT NULL,\nf DATE,\ng TIMESTAMP NOT NULL,\nh as a + 2,\nPRIMARY KEY (a, g) NOT ENFORCED\n)\nWITH (\n" + this.getConnectorSql(index) + ")");
        tableEnvironment.fromValues(new Expression[]{Expressions.row((Object)1L, (Object[])new Object[]{LocalTime.ofNanoOfDay(12345000000L), "ABCDE", Float.valueOf(12.12f), (byte)2, LocalDate.ofEpochDay(12345L), LocalDateTime.parse("2012-12-12T12:12:12")})}).executeInsert("esTable").await();
        RestHighLevelClient client = this.getClient();
        Map<String, Object> response = this.makeGetRequest(client, index, "1_2012-12-12T12:12:12");
        HashMap<String, Object> expectedMap = new HashMap<String, Object>();
        expectedMap.put("a", 1);
        expectedMap.put("b", "00:00:12");
        expectedMap.put("c", "ABCDE");
        expectedMap.put("d", 12.12);
        expectedMap.put("e", 2);
        expectedMap.put("f", "2003-10-20");
        expectedMap.put("g", "2012-12-12 12:12:12");
        Assertions.assertEquals(response, expectedMap);
    }

    @Test
    public void testWritingDocumentsNoPrimaryKey() throws Exception {
        SearchHits hits;
        TableEnvironment tableEnvironment = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        String index = "no-primary-key";
        tableEnvironment.executeSql("CREATE TABLE esTable (a BIGINT NOT NULL,\nb TIME,\nc STRING NOT NULL,\nd FLOAT,\ne TINYINT NOT NULL,\nf DATE,\ng TIMESTAMP NOT NULL\n)\nWITH (\n" + this.getConnectorSql(index) + ")");
        tableEnvironment.fromValues(new Expression[]{Expressions.row((Object)1L, (Object[])new Object[]{LocalTime.ofNanoOfDay(12345000000L), "ABCDE", Float.valueOf(12.12f), (byte)2, LocalDate.ofEpochDay(12345L), LocalDateTime.parse("2012-12-12T12:12:12")}), Expressions.row((Object)2L, (Object[])new Object[]{LocalTime.ofNanoOfDay(12345000000L), "FGHIJK", Float.valueOf(13.13f), (byte)4, LocalDate.ofEpochDay(12345L), LocalDateTime.parse("2013-12-12T13:13:13")})}).executeInsert("esTable").await();
        RestHighLevelClient client = this.getClient();
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofSeconds(30L));
        do {
            if (this.getTotalSearchHits(hits = this.makeSearchRequest(client, index)) >= 2L) continue;
            Thread.sleep(200L);
        } while (this.getTotalSearchHits(hits) < 2L && deadline.hasTimeLeft());
        if (this.getTotalSearchHits(hits) < 2L) {
            throw new AssertionError((Object)"Could not retrieve results from Elasticsearch.");
        }
        HashSet<Map> resultSet = new HashSet<Map>();
        resultSet.add(hits.getAt(0).getSourceAsMap());
        resultSet.add(hits.getAt(1).getSourceAsMap());
        HashMap<String, Object> expectedMap1 = new HashMap<String, Object>();
        expectedMap1.put("a", 1);
        expectedMap1.put("b", "00:00:12");
        expectedMap1.put("c", "ABCDE");
        expectedMap1.put("d", 12.12);
        expectedMap1.put("e", 2);
        expectedMap1.put("f", "2003-10-20");
        expectedMap1.put("g", "2012-12-12 12:12:12");
        HashMap<String, Object> expectedMap2 = new HashMap<String, Object>();
        expectedMap2.put("a", 2);
        expectedMap2.put("b", "00:00:12");
        expectedMap2.put("c", "FGHIJK");
        expectedMap2.put("d", 13.13);
        expectedMap2.put("e", 4);
        expectedMap2.put("f", "2003-10-20");
        expectedMap2.put("g", "2013-12-12 13:13:13");
        HashSet<HashMap<String, Object>> expectedSet = new HashSet<HashMap<String, Object>>();
        expectedSet.add(expectedMap1);
        expectedSet.add(expectedMap2);
        Assertions.assertEquals(resultSet, expectedSet);
    }

    @Test
    public void testWritingDocumentsWithDynamicIndex() throws Exception {
        TableEnvironment tableEnvironment = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        String index = "dynamic-index-{b|yyyy-MM-dd}";
        tableEnvironment.executeSql("CREATE TABLE esTable (a BIGINT NOT NULL,\nb TIMESTAMP NOT NULL,\nPRIMARY KEY (a) NOT ENFORCED\n)\nWITH (\n" + this.getConnectorSql(index) + ")");
        tableEnvironment.fromValues(new Expression[]{Expressions.row((Object)1L, (Object[])new Object[]{LocalDateTime.parse("2012-12-12T12:12:12")})}).executeInsert("esTable").await();
        RestHighLevelClient client = this.getClient();
        Map<String, Object> response = this.makeGetRequest(client, "dynamic-index-2012-12-12", "1");
        HashMap<String, Object> expectedMap = new HashMap<String, Object>();
        expectedMap.put("a", 1);
        expectedMap.put("b", "2012-12-12 12:12:12");
        Assertions.assertEquals(response, expectedMap);
    }

    @Test
    public void testWritingDocumentsWithDynamicIndexFromSystemTime() throws Exception {
        Map<String, Object> response;
        TableEnvironment tableEnvironment = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
        tableEnvironment.getConfig().set(TableConfigOptions.LOCAL_TIME_ZONE, (Object)"Asia/Shanghai");
        String dynamicIndex1 = "dynamic-index-" + dateTimeFormatter.format(LocalDateTime.now(ZoneId.of("Asia/Shanghai"))) + "_index";
        String index = "dynamic-index-{now()|yyyy-MM-dd}_index";
        tableEnvironment.executeSql("CREATE TABLE esTable (a BIGINT NOT NULL,\nb TIMESTAMP NOT NULL,\nPRIMARY KEY (a) NOT ENFORCED\n)\nWITH (\n" + this.getConnectorSql(index) + ")");
        String dynamicIndex2 = "dynamic-index-" + dateTimeFormatter.format(LocalDateTime.now(ZoneId.of("Asia/Shanghai"))) + "_index";
        tableEnvironment.fromValues(new Expression[]{Expressions.row((Object)1L, (Object[])new Object[]{LocalDateTime.parse("2012-12-12T12:12:12")})}).executeInsert("esTable").await();
        RestHighLevelClient client = this.getClient();
        try {
            response = this.makeGetRequest(client, dynamicIndex1, "1");
        }
        catch (ElasticsearchStatusException e) {
            if (e.status() == RestStatus.NOT_FOUND) {
                response = this.makeGetRequest(client, dynamicIndex2, "1");
            }
            throw e;
        }
        HashMap<String, Object> expectedMap = new HashMap<String, Object>();
        expectedMap.put("a", 1);
        expectedMap.put("b", "2012-12-12 12:12:12");
        Assertions.assertEquals(response, expectedMap);
    }
}

