package org.apache.flink.table.store.kafka;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

/* loaded from: input_file:org/apache/flink/table/store/kafka/KafkaLogStoreFactory.class */
public class KafkaLogStoreFactory implements LogStoreTableFactory {
    public static final String IDENTIFIER = "kafka";
    public static final String KAFKA_PREFIX = "kafka.";

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet hashSet = new HashSet();
        hashSet.add(KafkaLogOptions.BOOTSTRAP_SERVERS);
        return hashSet;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return new HashSet();
    }

    public Map<String, String> enrichOptions(DynamicTableFactory.Context context) {
        HashMap hashMap = new HashMap(context.getCatalogTable().getOptions());
        Preconditions.checkArgument(!hashMap.containsKey(KafkaLogOptions.TOPIC.key()), "Managed table can not contain custom topic. You need to remove topic in table options or session config.");
        hashMap.put(KafkaLogOptions.TOPIC.key(), context.getObjectIdentifier().asSummaryString());
        return hashMap;
    }

    private String topic(DynamicTableFactory.Context context) {
        return (String) context.getCatalogTable().getOptions().get(KafkaLogOptions.TOPIC.key());
    }

    public void onCreateTable(DynamicTableFactory.Context context, int i, boolean z) {
        Configuration fromMap = Configuration.fromMap(context.getCatalogTable().getOptions());
        try {
            AdminClient create = AdminClient.create(toKafkaProperties(fromMap));
            Throwable th = null;
            try {
                try {
                    HashMap hashMap = new HashMap();
                    fromMap.getOptional(CoreOptions.LOG_RETENTION).ifPresent(duration -> {
                    });
                    create.createTopics(Collections.singleton(new NewTopic(topic(context), Optional.of(Integer.valueOf(i)), Optional.empty()).configs(hashMap))).all().get();
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw new TableException("Error in createTopic", e);
            }
            if (!z) {
                throw new TableException(String.format("Failed to create kafka topic. Reason: topic %s exists for table %s. Suggestion: please try `DESCRIBE TABLE %s` to check whether table exists in current catalog. If table exists and the DDL needs to be executed multiple times, please use `CREATE TABLE IF NOT EXISTS` ddl instead. Otherwise, please choose another table name or manually delete the current topic and try again.", topic(context), context.getObjectIdentifier().asSerializableString(), context.getObjectIdentifier().asSerializableString()));
            }
        }
    }

    public void onDropTable(DynamicTableFactory.Context context, boolean z) {
        try {
            AdminClient create = AdminClient.create(toKafkaProperties(FactoryUtil.createTableFactoryHelper(this, context).getOptions()));
            Throwable th = null;
            try {
                create.deleteTopics(Collections.singleton(topic(context))).all().get();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
            } finally {
            }
        } catch (InterruptedException e) {
            throw new TableException("Error in deleteTopic", e);
        } catch (ExecutionException e2) {
            if (!(e2.getCause() instanceof UnknownTopicOrPartitionException)) {
                throw new TableException("Error in deleteTopic", e2);
            }
            if (!z) {
                throw new TableException(String.format("Failed to delete kafka topic. Reason: topic %s doesn't exist for table %s. Suggestion: please try `DROP TABLE IF EXISTS` ddl instead.", topic(context), context.getObjectIdentifier().asSerializableString()));
            }
        }
    }

    /* renamed from: createSourceProvider, reason: merged with bridge method [inline-methods] */
    public KafkaLogSourceProvider m6createSourceProvider(DynamicTableFactory.Context context, DynamicTableSource.Context context2, @Nullable int[][] iArr) {
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
        DataType physicalRowDataType = resolvedSchema.toPhysicalRowDataType();
        DeserializationSchema deserializationSchema = null;
        int[] primaryKeyIndexes = getPrimaryKeyIndexes(resolvedSchema);
        if (primaryKeyIndexes.length > 0) {
            deserializationSchema = (DeserializationSchema) LogStoreTableFactory.getKeyDecodingFormat(createTableFactoryHelper).createRuntimeDecoder(context2, DataTypeUtils.projectRow(physicalRowDataType, primaryKeyIndexes));
        }
        return new KafkaLogSourceProvider(topic(context), toKafkaProperties(createTableFactoryHelper.getOptions()), physicalRowDataType, primaryKeyIndexes, deserializationSchema, (DeserializationSchema) LogStoreTableFactory.getValueDecodingFormat(createTableFactoryHelper).createRuntimeDecoder(context2, physicalRowDataType), iArr, (CoreOptions.LogConsistency) createTableFactoryHelper.getOptions().get(CoreOptions.LOG_CONSISTENCY), (CoreOptions.LogStartupMode) createTableFactoryHelper.getOptions().get(CoreOptions.LOG_SCAN), (Long) createTableFactoryHelper.getOptions().get(CoreOptions.LOG_SCAN_TIMESTAMP_MILLS));
    }

    /* renamed from: createSinkProvider, reason: merged with bridge method [inline-methods] */
    public KafkaLogSinkProvider m5createSinkProvider(DynamicTableFactory.Context context, DynamicTableSink.Context context2) {
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
        DataType physicalRowDataType = resolvedSchema.toPhysicalRowDataType();
        SerializationSchema serializationSchema = null;
        int[] primaryKeyIndexes = getPrimaryKeyIndexes(resolvedSchema);
        if (primaryKeyIndexes.length > 0) {
            serializationSchema = (SerializationSchema) LogStoreTableFactory.getKeyEncodingFormat(createTableFactoryHelper).createRuntimeEncoder(context2, DataTypeUtils.projectRow(physicalRowDataType, primaryKeyIndexes));
        }
        return new KafkaLogSinkProvider(topic(context), toKafkaProperties(createTableFactoryHelper.getOptions()), serializationSchema, (SerializationSchema) LogStoreTableFactory.getValueEncodingFormat(createTableFactoryHelper).createRuntimeEncoder(context2, physicalRowDataType), (CoreOptions.LogConsistency) createTableFactoryHelper.getOptions().get(CoreOptions.LOG_CONSISTENCY), (CoreOptions.LogChangelogMode) createTableFactoryHelper.getOptions().get(CoreOptions.LOG_CHANGELOG_MODE));
    }

    private int[] getPrimaryKeyIndexes(ResolvedSchema resolvedSchema) {
        List columnNames = resolvedSchema.getColumnNames();
        return (int[]) resolvedSchema.getPrimaryKey().map((v0) -> {
            return v0.getColumns();
        }).map(list -> {
            Stream stream = list.stream();
            columnNames.getClass();
            return stream.mapToInt((v1) -> {
                return r1.indexOf(v1);
            }).toArray();
        }).orElseGet(() -> {
            return new int[0];
        });
    }

    public static Properties toKafkaProperties(ReadableConfig readableConfig) {
        Properties properties = new Properties();
        Map map = ((Configuration) readableConfig).toMap();
        map.keySet().stream().filter(str -> {
            return str.startsWith(KAFKA_PREFIX);
        }).forEach(str2 -> {
            properties.put(str2.substring(KAFKA_PREFIX.length()), map.get(str2));
        });
        if (readableConfig.get(CoreOptions.LOG_CONSISTENCY) == CoreOptions.LogConsistency.TRANSACTIONAL) {
            properties.setProperty("isolation.level", "read_committed");
        }
        return properties;
    }
}
