package org.apache.flink.connector.elasticsearch.table;

import java.util.Arrays;
import java.util.Collections;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.connector.elasticsearch.ElasticsearchUtil;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.TestLoggerExtension;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.function.Executable;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.class */
abstract class ElasticsearchDynamicSinkFactoryBaseTest {
    ElasticsearchDynamicSinkFactoryBaseTest() {
    }

    abstract ElasticsearchDynamicSinkFactoryBase createSinkFactory();

    abstract TestContext createPrefilledTestContext();

    void assertValidationException(String str, Executable executable) {
        Assertions.assertEquals(str, Assertions.assertThrows(ValidationException.class, executable).getMessage());
    }

    @Test
    public void validateWrongIndex() {
        ElasticsearchDynamicSinkFactoryBase createSinkFactory = createSinkFactory();
        assertValidationException("'index' must not be empty", () -> {
            createSinkFactory.createDynamicTableSink(createPrefilledTestContext().withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "").build());
        });
    }

    @Test
    public void validateWrongHosts() {
        ElasticsearchDynamicSinkFactoryBase createSinkFactory = createSinkFactory();
        assertValidationException("Could not parse host 'wrong-host' in option 'hosts'. It should follow the format 'http://host_name:port'.", () -> {
            createSinkFactory.createDynamicTableSink(createPrefilledTestContext().withOption(ElasticsearchConnectorOptions.HOSTS_OPTION.key(), "wrong-host").build());
        });
    }

    @Test
    public void validateWrongFlushSize() {
        ElasticsearchDynamicSinkFactoryBase createSinkFactory = createSinkFactory();
        assertValidationException("'sink.bulk-flush.max-size' must be in MB granularity. Got: 1024 bytes", () -> {
            createSinkFactory.createDynamicTableSink(createPrefilledTestContext().withOption(ElasticsearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION.key(), "1kb").build());
        });
    }

    @Test
    public void validateWrongRetries() {
        ElasticsearchDynamicSinkFactoryBase createSinkFactory = createSinkFactory();
        assertValidationException("'sink.bulk-flush.backoff.max-retries' must be at least 1. Got: 0", () -> {
            createSinkFactory.createDynamicTableSink(createPrefilledTestContext().withOption(ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), "0").build());
        });
    }

    @Test
    public void validateWrongMaxActions() {
        ElasticsearchDynamicSinkFactoryBase createSinkFactory = createSinkFactory();
        assertValidationException("'sink.bulk-flush.max-actions' must be at least 1. Got: -2", () -> {
            createSinkFactory.createDynamicTableSink(createPrefilledTestContext().withOption(ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "-2").build());
        });
    }

    @Test
    public void validateWrongBackoffDelay() {
        ElasticsearchDynamicSinkFactoryBase createSinkFactory = createSinkFactory();
        assertValidationException("Invalid value for option 'sink.bulk-flush.backoff.delay'.", () -> {
            createSinkFactory.createDynamicTableSink(createPrefilledTestContext().withOption(ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "-1s").build());
        });
    }

    @Test
    public void validatePrimaryKeyOnIllegalColumn() {
        ElasticsearchDynamicSinkFactoryBase createSinkFactory = createSinkFactory();
        assertValidationException("The table has a primary key on columns of illegal types: [ARRAY, MAP, MULTISET, ROW, RAW, VARBINARY].", () -> {
            createSinkFactory.createDynamicTableSink(createPrefilledTestContext().withSchema(new ResolvedSchema(Arrays.asList(Column.physical("a", DataTypes.BIGINT().notNull()), Column.physical("b", DataTypes.ARRAY(DataTypes.BIGINT().notNull()).notNull()), Column.physical("c", DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING()).notNull()), Column.physical("d", DataTypes.MULTISET(DataTypes.BIGINT().notNull()).notNull()), Column.physical("e", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("a", DataTypes.BIGINT())}).notNull()), Column.physical("f", DataTypes.RAW(Void.class, VoidSerializer.INSTANCE).notNull()), Column.physical("g", DataTypes.BYTES().notNull())), Collections.emptyList(), UniqueConstraint.primaryKey("name", Arrays.asList("a", "b", "c", "d", "e", "f", "g")))).build());
        });
    }

    @Test
    public void validateWrongCredential() {
        ElasticsearchDynamicSinkFactoryBase createSinkFactory = createSinkFactory();
        assertValidationException("'username' and 'password' must be set at the same time. Got: username 'username' and password ''", () -> {
            createSinkFactory.createDynamicTableSink(createPrefilledTestContext().withOption(ElasticsearchConnectorOptions.USERNAME_OPTION.key(), "username").withOption(ElasticsearchConnectorOptions.PASSWORD_OPTION.key(), "").build());
        });
    }

    @Test
    public void validateDynamicIndexOnChangelogStream() {
        DynamicTableSink createDynamicTableSink = createSinkFactory().createDynamicTableSink(createPrefilledTestContext().withOption(ElasticsearchConnectorOptions.INDEX_OPTION.key(), "dynamic-index-{now()|yyyy-MM-dd}_index").build());
        ChangelogMode build = ChangelogMode.newBuilder().addContainedKind(RowKind.DELETE).addContainedKind(RowKind.INSERT).build();
        assertValidationException("Dynamic indexing based on system time only works on append only stream.", () -> {
            createDynamicTableSink.getChangelogMode(build);
        });
    }

    @Test
    public void testSinkParallelism() {
        ElasticsearchDynamicSink createDynamicTableSink = createSinkFactory().createDynamicTableSink(createPrefilledTestContext().withOption(FactoryUtil.SINK_PARALLELISM.key(), "2").build());
        org.assertj.core.api.Assertions.assertThat(createDynamicTableSink).isInstanceOf(ElasticsearchDynamicSink.class);
        org.assertj.core.api.Assertions.assertThat(2).isEqualTo(createDynamicTableSink.getSinkRuntimeProvider(new ElasticsearchUtil.MockContext()).getParallelism().get());
    }
}
