/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.table;

import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils;
import org.apache.flink.connector.pulsar.table.PulsarTableOptions;
import org.apache.flink.connector.pulsar.table.PulsarTableValidationUtils;
import org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory;
import org.apache.flink.connector.pulsar.table.sink.PulsarTableSink;
import org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory;
import org.apache.flink.connector.pulsar.table.source.PulsarTableSource;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.shade.org.apache.commons.lang3.RandomStringUtils;

public class UpsertPulsarTableFactory
implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
    public static final String IDENTIFIER = "upsert-pulsar";
    public static final String DEFAULT_SUBSCRIPTION_NAME_PREFIX = "flink-upsert-pulsar-";
    public static final boolean UPSERT_ENABLED = true;

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = PulsarTableOptionUtils.getKeyDecodingFormat(helper);
        DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = PulsarTableOptionUtils.getValueDecodingFormat(helper);
        ReadableConfig tableOptions = helper.getOptions();
        helper.validateExcept(new String[]{"pulsar.client.", "pulsar.source.", "pulsar.consumer.", "pulsar.producer.", "pulsar.sink."});
        List<String> topics = PulsarTableOptionUtils.getTopicListFromOptions(tableOptions);
        StartCursor startCursor = PulsarTableOptionUtils.getStartCursor(tableOptions);
        StopCursor stopCursor = PulsarTableOptionUtils.getStopCursor(tableOptions);
        SubscriptionType subscriptionType = PulsarTableOptionUtils.getSubscriptionType(tableOptions);
        Properties properties = PulsarTableOptionUtils.getPulsarProperties(tableOptions);
        properties.setProperty(PulsarOptions.PULSAR_SERVICE_URL.key(), (String)tableOptions.get(PulsarTableOptions.SERVICE_URL));
        properties.setProperty(PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME.key(), tableOptions.getOptional(PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME).orElse(DEFAULT_SUBSCRIPTION_NAME_PREFIX + RandomStringUtils.randomAlphabetic((int)5)));
        DataType physicalDataType = context.getPhysicalRowDataType();
        this.overrideKeyFieldOptionsForUpsertMode((Configuration)tableOptions, context);
        PulsarTableValidationUtils.validateUpsertModeKeyConstraints(tableOptions, context.getPrimaryKeyIndexes());
        PulsarTableValidationUtils.validateTableSourceOptions(tableOptions);
        int[] valueProjection = PulsarTableOptionUtils.createValueFormatProjection(tableOptions, physicalDataType);
        int[] keyProjection = PulsarTableOptionUtils.createKeyFormatProjection(tableOptions, physicalDataType);
        PulsarTableDeserializationSchemaFactory deserializationSchemaFactory = new PulsarTableDeserializationSchemaFactory(physicalDataType, keyDecodingFormat, keyProjection, valueDecodingFormat, valueProjection, true);
        DecodingFormat<DeserializationSchema<RowData>> decodingFormatForMetadataPushdown = valueDecodingFormat;
        ChangelogMode changelogMode = ChangelogMode.newBuilder().addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(RowKind.DELETE).build();
        return new PulsarTableSource(deserializationSchemaFactory, decodingFormatForMetadataPushdown, changelogMode, topics, properties, startCursor, stopCursor, subscriptionType);
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = PulsarTableOptionUtils.getKeyEncodingFormat(helper);
        EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = PulsarTableOptionUtils.getValueEncodingFormat(helper);
        ReadableConfig tableOptions = helper.getOptions();
        helper.validateExcept(new String[]{"pulsar.client.", "pulsar.source.", "pulsar.consumer.", "pulsar.producer.", "pulsar.sink."});
        TopicRouter<RowData> topicRouter = PulsarTableOptionUtils.getTopicRouter(tableOptions, context.getClassLoader());
        long messageDelayMillis = PulsarTableOptionUtils.getMessageDelayMillis(tableOptions);
        List<String> topics = PulsarTableOptionUtils.getTopicListFromOptions(tableOptions);
        Properties properties = PulsarTableOptionUtils.getPulsarProperties(tableOptions);
        properties.setProperty(PulsarOptions.PULSAR_SERVICE_URL.key(), (String)tableOptions.get(PulsarTableOptions.SERVICE_URL));
        DataType physicalDataType = context.getPhysicalRowDataType();
        this.overrideKeyFieldOptionsForUpsertMode((Configuration)tableOptions, context);
        PulsarTableValidationUtils.validateUpsertModeKeyConstraints(tableOptions, context.getPrimaryKeyIndexes());
        PulsarTableValidationUtils.validateTableSinkOptions(tableOptions);
        int[] keyProjection = PulsarTableOptionUtils.createKeyFormatProjection(tableOptions, physicalDataType);
        int[] valueProjection = PulsarTableOptionUtils.createValueFormatProjection(tableOptions, physicalDataType);
        PulsarTableSerializationSchemaFactory serializationSchemaFactory = new PulsarTableSerializationSchemaFactory(physicalDataType, keyEncodingFormat, keyProjection, valueEncodingFormat, valueProjection, true);
        DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.AT_LEAST_ONCE;
        ChangelogMode changelogMode = ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(RowKind.DELETE).build();
        TopicRoutingMode topicRoutingMode = TopicRoutingMode.MESSAGE_KEY_HASH;
        return new PulsarTableSink(serializationSchemaFactory, changelogMode, topics, properties, deliveryGuarantee, topicRouter, topicRoutingMode, messageDelayMillis);
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return Stream.of(PulsarTableOptions.TOPICS, PulsarTableOptions.SERVICE_URL).collect(Collectors.toSet());
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return Stream.of(FactoryUtil.FORMAT, PulsarTableOptions.VALUE_FORMAT, PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME, PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID, PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME, PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID, PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID, PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME, PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL, FactoryUtil.SINK_PARALLELISM, PulsarTableOptions.KEY_FORMAT, PulsarTableOptions.EXPLICIT).collect(Collectors.toSet());
    }

    public Set<ConfigOption<?>> forwardOptions() {
        return Stream.of(PulsarTableOptions.TOPICS, PulsarTableOptions.SERVICE_URL, PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME, PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID, PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME, PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID, PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID, PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME, PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL).collect(Collectors.toSet());
    }

    private void overrideKeyFieldOptionsForUpsertMode(Configuration tableOptions, DynamicTableFactory.Context context) {
        List keyFields = ((UniqueConstraint)context.getCatalogTable().getResolvedSchema().getPrimaryKey().get()).getColumns();
        tableOptions.set(PulsarTableOptions.KEY_FIELDS, (Object)keyFields);
    }
}

