package org.apache.flink.streaming.connectors.elasticsearch;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.index.IndexGenerator;
import org.apache.flink.streaming.connectors.elasticsearch.index.IndexGeneratorFactory;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.Elasticsearch;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.TestTableDescriptor;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryTestBase.class */
public abstract class ElasticsearchUpsertTableSinkFactoryTestBase extends TestLogger {
    protected static final String HOSTNAME = "host1";
    protected static final int PORT = 1234;
    protected static final String SCHEMA = "https";
    protected static final String INDEX = "MyIndex";
    protected static final String DOC_TYPE = "MyType";
    protected static final String KEY_DELIMITER = "#";
    protected static final String KEY_NULL_LITERAL = "";
    private static final String FIELD_KEY = "key";
    private static final String FIELD_FRUIT_NAME = "fruit_name";
    private static final String FIELD_COUNT = "count";
    private static final String FIELD_TS = "ts";

    /* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryTestBase$DummyFailureHandler.class */
    public static class DummyFailureHandler implements ActionRequestFailureHandler {
        public void onFailure(ActionRequest actionRequest, Throwable th, int i, RequestIndexer requestIndexer) {
        }

        public boolean equals(Object obj) {
            return this == obj || (obj instanceof DummyFailureHandler);
        }

        public int hashCode() {
            return DummyFailureHandler.class.hashCode();
        }
    }

    @Test
    public void testTableSink() {
        TableSchema createTestSchema = createTestSchema();
        ElasticsearchUpsertTableSinkBase expectedTableSink = getExpectedTableSink(false, createTestSchema, Collections.singletonList(new ElasticsearchUpsertTableSinkBase.Host(HOSTNAME, PORT, SCHEMA)), INDEX, DOC_TYPE, KEY_DELIMITER, KEY_NULL_LITERAL, JsonRowSerializationSchema.builder().withTypeInfo(createTestSchema.toRowType()).build(), XContentType.JSON, new DummyFailureHandler(), createTestSinkOptions(), IndexGeneratorFactory.createIndexGenerator(INDEX, createTestSchema));
        Map<String, String> createElasticSearchProperties = createElasticSearchProperties();
        Assert.assertEquals(expectedTableSink, TableFactoryService.find(StreamTableSinkFactory.class, createElasticSearchProperties).createStreamTableSink(createElasticSearchProperties));
    }

    @Test
    public void testTableSinkWithLegacyProperties() {
        TableSchema createTestSchema = createTestSchema();
        ElasticsearchUpsertTableSinkBase expectedTableSink = getExpectedTableSink(false, createTestSchema, Collections.singletonList(new ElasticsearchUpsertTableSinkBase.Host(HOSTNAME, PORT, SCHEMA)), INDEX, DOC_TYPE, KEY_DELIMITER, KEY_NULL_LITERAL, JsonRowSerializationSchema.builder().withTypeInfo(createTestSchema.toRowType()).build(), XContentType.JSON, new DummyFailureHandler(), createTestSinkOptions(), IndexGeneratorFactory.createIndexGenerator(INDEX, createTestSchema));
        Map<String, String> createElasticSearchProperties = createElasticSearchProperties();
        HashMap hashMap = new HashMap();
        hashMap.putAll(createElasticSearchProperties);
        hashMap.remove("connector.hosts");
        hashMap.put("connector.hosts.0.hostname", HOSTNAME);
        hashMap.put("connector.hosts.0.port", "1234");
        hashMap.put("connector.hosts.0.protocol", SCHEMA);
        Assert.assertEquals(expectedTableSink, TableFactoryService.find(StreamTableSinkFactory.class, hashMap).createStreamTableSink(hashMap));
    }

    protected TableSchema createTestSchema() {
        return TableSchema.builder().field(FIELD_KEY, DataTypes.BIGINT()).field(FIELD_FRUIT_NAME, DataTypes.STRING()).field(FIELD_COUNT, DataTypes.DECIMAL(10, 4)).field(FIELD_TS, DataTypes.TIMESTAMP(3)).build();
    }

    protected Map<ElasticsearchUpsertTableSinkBase.SinkOption, String> createTestSinkOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_ENABLED, "true");
        hashMap.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_TYPE, "EXPONENTIAL");
        hashMap.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_DELAY, "123");
        hashMap.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_RETRIES, "3");
        hashMap.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_INTERVAL, "100");
        hashMap.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_MAX_ACTIONS, "1000");
        hashMap.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_MAX_SIZE, "1 mb");
        hashMap.put(ElasticsearchUpsertTableSinkBase.SinkOption.REST_MAX_RETRY_TIMEOUT, "100");
        hashMap.put(ElasticsearchUpsertTableSinkBase.SinkOption.REST_PATH_PREFIX, "/myapp");
        return hashMap;
    }

    protected Map<String, String> createElasticSearchProperties() {
        return new TestTableDescriptor(new Elasticsearch().version(getElasticsearchVersion()).host(HOSTNAME, PORT, SCHEMA).index(INDEX).documentType(DOC_TYPE).keyDelimiter(KEY_DELIMITER).keyNullLiteral(KEY_NULL_LITERAL).bulkFlushBackoffExponential().bulkFlushBackoffDelay(123L).bulkFlushBackoffMaxRetries(3).bulkFlushInterval(100L).bulkFlushMaxActions(1000).bulkFlushMaxSize("1 MB").failureHandlerCustom(DummyFailureHandler.class).connectionMaxRetryTimeout(100).connectionPathPrefix("/myapp")).withFormat(new Json().deriveSchema()).withSchema(new Schema().field(FIELD_KEY, DataTypes.BIGINT()).field(FIELD_FRUIT_NAME, DataTypes.STRING()).field(FIELD_COUNT, DataTypes.DECIMAL(10, 4)).field(FIELD_TS, DataTypes.TIMESTAMP(3))).inUpsertMode().toProperties();
    }

    protected abstract String getElasticsearchVersion();

    protected abstract ElasticsearchUpsertTableSinkBase getExpectedTableSink(boolean z, TableSchema tableSchema, List<ElasticsearchUpsertTableSinkBase.Host> list, String str, String str2, String str3, String str4, SerializationSchema<Row> serializationSchema, XContentType xContentType, ActionRequestFailureHandler actionRequestFailureHandler, Map<ElasticsearchUpsertTableSinkBase.SinkOption, String> map, IndexGenerator indexGenerator);
}
