/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.elasticsearch;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch.index.IndexGenerator;
import org.apache.flink.streaming.connectors.elasticsearch.index.IndexGeneratorFactory;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.descriptors.Elasticsearch;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.TestTableDescriptor;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.Assert;
import org.junit.Test;

public abstract class ElasticsearchUpsertTableSinkFactoryTestBase
extends TestLogger {
    protected static final String HOSTNAME = "host1";
    protected static final int PORT = 1234;
    protected static final String SCHEMA = "https";
    protected static final String INDEX = "MyIndex";
    protected static final String DOC_TYPE = "MyType";
    protected static final String KEY_DELIMITER = "#";
    protected static final String KEY_NULL_LITERAL = "";
    private static final String FIELD_KEY = "key";
    private static final String FIELD_FRUIT_NAME = "fruit_name";
    private static final String FIELD_COUNT = "count";
    private static final String FIELD_TS = "ts";

    @Test
    public void testTableSink() {
        TableSchema schema = this.createTestSchema();
        ElasticsearchUpsertTableSinkBase expectedSink = this.getExpectedTableSink(false, schema, Collections.singletonList(new ElasticsearchUpsertTableSinkBase.Host(HOSTNAME, 1234, SCHEMA)), INDEX, DOC_TYPE, KEY_DELIMITER, KEY_NULL_LITERAL, (SerializationSchema<Row>)JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(), XContentType.JSON, new DummyFailureHandler(), this.createTestSinkOptions(), IndexGeneratorFactory.createIndexGenerator((String)INDEX, (TableSchema)schema));
        Map<String, String> elasticSearchProperties = this.createElasticSearchProperties();
        StreamTableSink actualSink = ((StreamTableSinkFactory)TableFactoryService.find(StreamTableSinkFactory.class, elasticSearchProperties)).createStreamTableSink(elasticSearchProperties);
        Assert.assertEquals((Object)expectedSink, (Object)actualSink);
    }

    @Test
    public void testTableSinkWithLegacyProperties() {
        TableSchema schema = this.createTestSchema();
        ElasticsearchUpsertTableSinkBase expectedSink = this.getExpectedTableSink(false, schema, Collections.singletonList(new ElasticsearchUpsertTableSinkBase.Host(HOSTNAME, 1234, SCHEMA)), INDEX, DOC_TYPE, KEY_DELIMITER, KEY_NULL_LITERAL, (SerializationSchema<Row>)JsonRowSerializationSchema.builder().withTypeInfo(schema.toRowType()).build(), XContentType.JSON, new DummyFailureHandler(), this.createTestSinkOptions(), IndexGeneratorFactory.createIndexGenerator((String)INDEX, (TableSchema)schema));
        Map<String, String> elasticSearchProperties = this.createElasticSearchProperties();
        HashMap<String, String> legacyPropertiesMap = new HashMap<String, String>();
        legacyPropertiesMap.putAll(elasticSearchProperties);
        legacyPropertiesMap.remove("connector.hosts");
        legacyPropertiesMap.put("connector.hosts.0.hostname", HOSTNAME);
        legacyPropertiesMap.put("connector.hosts.0.port", "1234");
        legacyPropertiesMap.put("connector.hosts.0.protocol", SCHEMA);
        StreamTableSink actualSink = ((StreamTableSinkFactory)TableFactoryService.find(StreamTableSinkFactory.class, legacyPropertiesMap)).createStreamTableSink(legacyPropertiesMap);
        Assert.assertEquals((Object)expectedSink, (Object)actualSink);
    }

    protected TableSchema createTestSchema() {
        return TableSchema.builder().field(FIELD_KEY, DataTypes.BIGINT()).field(FIELD_FRUIT_NAME, DataTypes.STRING()).field(FIELD_COUNT, DataTypes.DECIMAL((int)10, (int)4)).field(FIELD_TS, DataTypes.TIMESTAMP((int)3)).build();
    }

    protected Map<ElasticsearchUpsertTableSinkBase.SinkOption, String> createTestSinkOptions() {
        HashMap<ElasticsearchUpsertTableSinkBase.SinkOption, String> sinkOptions = new HashMap<ElasticsearchUpsertTableSinkBase.SinkOption, String>();
        sinkOptions.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_ENABLED, "true");
        sinkOptions.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_TYPE, "EXPONENTIAL");
        sinkOptions.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_DELAY, "123");
        sinkOptions.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_RETRIES, "3");
        sinkOptions.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_INTERVAL, "100");
        sinkOptions.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_MAX_ACTIONS, "1000");
        sinkOptions.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_MAX_SIZE, "1 mb");
        sinkOptions.put(ElasticsearchUpsertTableSinkBase.SinkOption.REST_MAX_RETRY_TIMEOUT, "100");
        sinkOptions.put(ElasticsearchUpsertTableSinkBase.SinkOption.REST_PATH_PREFIX, "/myapp");
        return sinkOptions;
    }

    protected Map<String, String> createElasticSearchProperties() {
        return ((TestTableDescriptor)((TestTableDescriptor)new TestTableDescriptor((ConnectorDescriptor)new Elasticsearch().version(this.getElasticsearchVersion()).host(HOSTNAME, 1234, SCHEMA).index(INDEX).documentType(DOC_TYPE).keyDelimiter(KEY_DELIMITER).keyNullLiteral(KEY_NULL_LITERAL).bulkFlushBackoffExponential().bulkFlushBackoffDelay(123L).bulkFlushBackoffMaxRetries(3).bulkFlushInterval(100L).bulkFlushMaxActions(1000).bulkFlushMaxSize("1 MB").failureHandlerCustom(DummyFailureHandler.class).connectionMaxRetryTimeout(100).connectionPathPrefix("/myapp")).withFormat((FormatDescriptor)new Json().deriveSchema())).withSchema(new Schema().field(FIELD_KEY, DataTypes.BIGINT()).field(FIELD_FRUIT_NAME, DataTypes.STRING()).field(FIELD_COUNT, DataTypes.DECIMAL((int)10, (int)4)).field(FIELD_TS, DataTypes.TIMESTAMP((int)3))).inUpsertMode()).toProperties();
    }

    protected abstract String getElasticsearchVersion();

    protected abstract ElasticsearchUpsertTableSinkBase getExpectedTableSink(boolean var1, TableSchema var2, List<ElasticsearchUpsertTableSinkBase.Host> var3, String var4, String var5, String var6, String var7, SerializationSchema<Row> var8, XContentType var9, ActionRequestFailureHandler var10, Map<ElasticsearchUpsertTableSinkBase.SinkOption, String> var11, IndexGenerator var12);

    public static class DummyFailureHandler
    implements ActionRequestFailureHandler {
        public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) {
        }

        public boolean equals(Object o) {
            return this == o || o instanceof DummyFailureHandler;
        }

        public int hashCode() {
            return DummyFailureHandler.class.hashCode();
        }
    }
}

