/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.elasticsearch;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.xcontent.XContentType;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.ElasticsearchValidator;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.descriptors.StreamTableDescriptorValidator;
import org.apache.flink.table.factories.SerializationSchemaFactory;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.types.Row;
import org.apache.flink.util.InstantiationUtil;

@Internal
public abstract class ElasticsearchUpsertTableSinkFactoryBase
implements StreamTableSinkFactory<Tuple2<Boolean, Row>> {
    private static final String SUPPORTED_FORMAT_TYPE = "json";
    private static final XContentType SUPPORTED_CONTENT_TYPE = XContentType.JSON;
    private static final String DEFAULT_KEY_DELIMITER = "_";
    private static final String DEFAULT_KEY_NULL_LITERAL = "null";
    private static final String DEFAULT_FAILURE_HANDLER = "fail";

    public Map<String, String> requiredContext() {
        HashMap<String, String> context = new HashMap<String, String>();
        context.put("connector.type", "elasticsearch");
        context.put("connector.version", this.elasticsearchVersion());
        context.put("connector.property-version", "1");
        return context;
    }

    public List<String> supportedProperties() {
        ArrayList<String> properties = new ArrayList<String>();
        properties.add("update-mode");
        properties.add("connector.hosts.#.hostname");
        properties.add("connector.hosts.#.port");
        properties.add("connector.hosts.#.protocol");
        properties.add("connector.index");
        properties.add("connector.document-type");
        properties.add("connector.key-delimiter");
        properties.add("connector.key-null-literal");
        properties.add("connector.failure-handler");
        properties.add("connector.failure-handler-class");
        properties.add("connector.flush-on-checkpoint");
        properties.add("connector.bulk-flush.max-actions");
        properties.add("connector.bulk-flush.max-size");
        properties.add("connector.bulk-flush.interval");
        properties.add("connector.bulk-flush.backoff.type");
        properties.add("connector.bulk-flush.backoff.max-retries");
        properties.add("connector.bulk-flush.backoff.delay");
        properties.add("connector.connection-max-retry-timeout");
        properties.add("connector.connection-path-prefix");
        properties.add("schema.#.type");
        properties.add("schema.#.name");
        properties.add("format.*");
        return properties;
    }

    public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties) {
        DescriptorProperties descriptorProperties = this.getValidatedProperties(properties);
        return this.createElasticsearchUpsertTableSink(descriptorProperties.isValue("update-mode", "append"), descriptorProperties.getTableSchema("schema"), this.getHosts(descriptorProperties), descriptorProperties.getString("connector.index"), descriptorProperties.getString("connector.document-type"), descriptorProperties.getOptionalString("connector.key-delimiter").orElse(DEFAULT_KEY_DELIMITER), descriptorProperties.getOptionalString("connector.key-null-literal").orElse(DEFAULT_KEY_NULL_LITERAL), this.getSerializationSchema(properties), SUPPORTED_CONTENT_TYPE, this.getFailureHandler(descriptorProperties), this.getSinkOptions(descriptorProperties));
    }

    protected abstract String elasticsearchVersion();

    protected abstract ElasticsearchUpsertTableSinkBase createElasticsearchUpsertTableSink(boolean var1, TableSchema var2, List<ElasticsearchUpsertTableSinkBase.Host> var3, String var4, String var5, String var6, String var7, SerializationSchema<Row> var8, XContentType var9, ActionRequestFailureHandler var10, Map<ElasticsearchUpsertTableSinkBase.SinkOption, String> var11);

    private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
        DescriptorProperties descriptorProperties = new DescriptorProperties(true);
        descriptorProperties.putProperties(properties);
        new StreamTableDescriptorValidator(true, false, true).validate(descriptorProperties);
        new SchemaValidator(true, false, false).validate(descriptorProperties);
        new ElasticsearchValidator().validate(descriptorProperties);
        return descriptorProperties;
    }

    private List<ElasticsearchUpsertTableSinkBase.Host> getHosts(DescriptorProperties descriptorProperties) {
        List hosts = descriptorProperties.getFixedIndexedProperties("connector.hosts", Arrays.asList("hostname", "port", "protocol"));
        return hosts.stream().map(host -> new ElasticsearchUpsertTableSinkBase.Host(descriptorProperties.getString((String)host.get("hostname")), descriptorProperties.getInt((String)host.get("port")), descriptorProperties.getString((String)host.get("protocol")))).collect(Collectors.toList());
    }

    private SerializationSchema<Row> getSerializationSchema(Map<String, String> properties) {
        String formatType = properties.get("format.type");
        if (formatType == null || !formatType.equals(SUPPORTED_FORMAT_TYPE)) {
            throw new ValidationException("The Elasticsearch sink requires a 'json' format.");
        }
        SerializationSchemaFactory formatFactory = (SerializationSchemaFactory)TableFactoryService.find(SerializationSchemaFactory.class, properties, (ClassLoader)this.getClass().getClassLoader());
        return formatFactory.createSerializationSchema(properties);
    }

    private ActionRequestFailureHandler getFailureHandler(DescriptorProperties descriptorProperties) {
        String failureHandler;
        switch (failureHandler = descriptorProperties.getOptionalString("connector.failure-handler").orElse(DEFAULT_FAILURE_HANDLER)) {
            case "fail": {
                return new NoOpFailureHandler();
            }
            case "ignore": {
                return new IgnoringFailureHandler();
            }
            case "retry-rejected": {
                return new RetryRejectedExecutionFailureHandler();
            }
            case "custom": {
                Class clazz = descriptorProperties.getClass("connector.failure-handler-class", ActionRequestFailureHandler.class);
                return (ActionRequestFailureHandler)InstantiationUtil.instantiate((Class)clazz);
            }
        }
        throw new IllegalArgumentException("Unknown failure handler.");
    }

    private Map<ElasticsearchUpsertTableSinkBase.SinkOption, String> getSinkOptions(DescriptorProperties descriptorProperties) {
        HashMap<ElasticsearchUpsertTableSinkBase.SinkOption, String> options = new HashMap<ElasticsearchUpsertTableSinkBase.SinkOption, String>();
        descriptorProperties.getOptionalBoolean("connector.flush-on-checkpoint").ifPresent(v -> options.put(ElasticsearchUpsertTableSinkBase.SinkOption.DISABLE_FLUSH_ON_CHECKPOINT, String.valueOf(v == false)));
        this.mapSinkOption(descriptorProperties, options, "connector.bulk-flush.max-actions", ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_MAX_ACTIONS);
        this.mapSinkOption(descriptorProperties, options, "connector.bulk-flush.max-size", ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_MAX_SIZE);
        this.mapSinkOption(descriptorProperties, options, "connector.bulk-flush.interval", ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_INTERVAL);
        descriptorProperties.getOptionalString("connector.bulk-flush.backoff.type").ifPresent(v -> {
            options.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_ENABLED, String.valueOf(!v.equals("disabled")));
            switch (v) {
                case "constant": {
                    options.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_TYPE, ElasticsearchSinkBase.FlushBackoffType.CONSTANT.toString());
                    break;
                }
                case "exponential": {
                    options.put(ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_TYPE, ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL.toString());
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unknown backoff type.");
                }
            }
        });
        this.mapSinkOption(descriptorProperties, options, "connector.bulk-flush.backoff.max-retries", ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_RETRIES);
        this.mapSinkOption(descriptorProperties, options, "connector.bulk-flush.backoff.delay", ElasticsearchUpsertTableSinkBase.SinkOption.BULK_FLUSH_BACKOFF_DELAY);
        this.mapSinkOption(descriptorProperties, options, "connector.connection-max-retry-timeout", ElasticsearchUpsertTableSinkBase.SinkOption.REST_MAX_RETRY_TIMEOUT);
        this.mapSinkOption(descriptorProperties, options, "connector.connection-path-prefix", ElasticsearchUpsertTableSinkBase.SinkOption.REST_PATH_PREFIX);
        return options;
    }

    private void mapSinkOption(DescriptorProperties descriptorProperties, Map<ElasticsearchUpsertTableSinkBase.SinkOption, String> options, String fromKey, ElasticsearchUpsertTableSinkBase.SinkOption toKey) {
        descriptorProperties.getOptionalString(fromKey).ifPresent(v -> options.put(toKey, (String)v));
    }
}

