package org.apache.beam.io.debezium;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.io.debezium.AutoValue_DebeziumIO_ConnectorConfiguration;
import org.apache.beam.io.debezium.AutoValue_DebeziumIO_Read;
import org.apache.beam.io.debezium.KafkaSourceConsumerFn;
import org.apache.beam.io.debezium.SourceRecordJson;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/io/debezium/DebeziumIO.class */
public class DebeziumIO {
    private static final Logger LOG = LoggerFactory.getLogger(DebeziumIO.class);

    @AutoValue
    /* loaded from: input_file:org/apache/beam/io/debezium/DebeziumIO$ConnectorConfiguration.class */
    public static abstract class ConnectorConfiguration implements Serializable {
        private static final long serialVersionUID = 1;

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/io/debezium/DebeziumIO$ConnectorConfiguration$Builder.class */
        public static abstract class Builder {
            abstract Builder setConnectorClass(ValueProvider<Class<?>> valueProvider);

            abstract Builder setHostName(ValueProvider<String> valueProvider);

            abstract Builder setPort(ValueProvider<String> valueProvider);

            abstract Builder setUsername(ValueProvider<String> valueProvider);

            abstract Builder setPassword(ValueProvider<String> valueProvider);

            abstract Builder setConnectionProperties(ValueProvider<Map<String, String>> valueProvider);

            abstract Builder setSourceConnector(ValueProvider<SourceConnector> valueProvider);

            abstract ConnectorConfiguration build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ValueProvider<Class<?>> getConnectorClass();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ValueProvider<String> getHostName();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ValueProvider<String> getPort();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ValueProvider<String> getUsername();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ValueProvider<String> getPassword();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ValueProvider<SourceConnector> getSourceConnector();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ValueProvider<Map<String, String>> getConnectionProperties();

        abstract Builder builder();

        public static ConnectorConfiguration create() {
            return new AutoValue_DebeziumIO_ConnectorConfiguration.Builder().setConnectionProperties(ValueProvider.StaticValueProvider.of(new HashMap())).build();
        }

        public ConnectorConfiguration withConnectorClass(Class<?> cls) {
            Preconditions.checkArgument(cls != null, "connectorClass can not be null");
            return withConnectorClass((ValueProvider<Class<?>>) ValueProvider.StaticValueProvider.of(cls));
        }

        public ConnectorConfiguration withConnectorClass(ValueProvider<Class<?>> valueProvider) {
            Preconditions.checkArgument(valueProvider != null, "connectorClass can not be null");
            return builder().setConnectorClass(valueProvider).build();
        }

        public ConnectorConfiguration withHostName(String str) {
            Preconditions.checkArgument(str != null, "hostName can not be null");
            return withHostName((ValueProvider<String>) ValueProvider.StaticValueProvider.of(str));
        }

        public ConnectorConfiguration withHostName(ValueProvider<String> valueProvider) {
            Preconditions.checkArgument(valueProvider != null, "hostName can not be null");
            return builder().setHostName(valueProvider).build();
        }

        public ConnectorConfiguration withPort(String str) {
            Preconditions.checkArgument(str != null, "port can not be null");
            return withPort((ValueProvider<String>) ValueProvider.StaticValueProvider.of(str));
        }

        public ConnectorConfiguration withPort(ValueProvider<String> valueProvider) {
            Preconditions.checkArgument(valueProvider != null, "port can not be null");
            return builder().setPort(valueProvider).build();
        }

        public ConnectorConfiguration withUsername(String str) {
            Preconditions.checkArgument(str != null, "username can not be null");
            return withUsername((ValueProvider<String>) ValueProvider.StaticValueProvider.of(str));
        }

        public ConnectorConfiguration withUsername(ValueProvider<String> valueProvider) {
            Preconditions.checkArgument(valueProvider != null, "username can not be null");
            return builder().setUsername(valueProvider).build();
        }

        public ConnectorConfiguration withPassword(String str) {
            Preconditions.checkArgument(str != null, "password can not be null");
            return withPassword((ValueProvider<String>) ValueProvider.StaticValueProvider.of(str));
        }

        public ConnectorConfiguration withPassword(ValueProvider<String> valueProvider) {
            Preconditions.checkArgument(valueProvider != null, "password can not be null");
            return builder().setPassword(valueProvider).build();
        }

        public ConnectorConfiguration withConnectionProperties(Map<String, String> map) {
            Preconditions.checkArgument(map != null, "connectionProperties can not be null");
            return withConnectionProperties((ValueProvider<Map<String, String>>) ValueProvider.StaticValueProvider.of(map));
        }

        public ConnectorConfiguration withConnectionProperties(ValueProvider<Map<String, String>> valueProvider) {
            Preconditions.checkArgument(valueProvider != null, "connectionProperties can not be null");
            return builder().setConnectionProperties(valueProvider).build();
        }

        public ConnectorConfiguration withConnectionProperty(String str, String str2) {
            Preconditions.checkArgument(str != null, "key can not be null");
            Preconditions.checkArgument(str2 != null, "value can not be null");
            Preconditions.checkArgument(getConnectionProperties().get() != null, "connectionProperties can not be null");
            ConnectorConfiguration build = builder().build();
            ((Map) build.getConnectionProperties().get()).putIfAbsent(str, str2);
            return build;
        }

        public ConnectorConfiguration withSourceConnector(SourceConnector sourceConnector) {
            Preconditions.checkArgument(sourceConnector != null, "sourceConnector can not be null");
            return withSourceConnector((ValueProvider<SourceConnector>) ValueProvider.StaticValueProvider.of(sourceConnector));
        }

        public ConnectorConfiguration withSourceConnector(ValueProvider<SourceConnector> valueProvider) {
            Preconditions.checkArgument(valueProvider != null, "sourceConnector can not be null");
            return builder().setSourceConnector(valueProvider).build();
        }

        public Map<String, String> getConfigurationMap() {
            HashMap hashMap = new HashMap();
            hashMap.computeIfAbsent("connector.class", str -> {
                return ((Class) getConnectorClass().get()).getCanonicalName();
            });
            hashMap.computeIfAbsent("database.hostname", str2 -> {
                return (String) getHostName().get();
            });
            hashMap.computeIfAbsent("database.port", str3 -> {
                return (String) getPort().get();
            });
            hashMap.computeIfAbsent("database.user", str4 -> {
                return (String) getUsername().get();
            });
            hashMap.computeIfAbsent("database.password", str5 -> {
                return (String) getPassword().get();
            });
            for (Map.Entry entry : ((Map) getConnectionProperties().get()).entrySet()) {
                hashMap.computeIfAbsent((String) entry.getKey(), str6 -> {
                    return (String) entry.getValue();
                });
            }
            hashMap.computeIfAbsent("database.history", str7 -> {
                return KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName();
            });
            DebeziumIO.LOG.debug("---------------- Connector configuration: {}", Joiner.on('\n').withKeyValueSeparator(" -> ").join(hashMap));
            return hashMap;
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/io/debezium/DebeziumIO$Read.class */
    public static abstract class Read<T> extends PTransform<PBegin, PCollection<T>> {
        private static final long serialVersionUID = 1;

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/io/debezium/DebeziumIO$Read$Builder.class */
        public static abstract class Builder<T> {
            abstract Builder<T> setConnectorConfiguration(ConnectorConfiguration connectorConfiguration);

            abstract Builder<T> setCoder(Coder<T> coder);

            abstract Builder<T> setFormatFunction(SourceRecordMapper<T> sourceRecordMapper);

            abstract Builder<T> setMaxNumberOfRecords(Integer num);

            abstract Builder<T> setMaxTimeToRun(Long l);

            abstract Read<T> build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ConnectorConfiguration getConnectorConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SourceRecordMapper<T> getFormatFunction();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Integer getMaxNumberOfRecords();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Long getMaxTimeToRun();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Coder<T> getCoder();

        abstract Builder<T> toBuilder();

        public Read<T> withConnectorConfiguration(ConnectorConfiguration connectorConfiguration) {
            Preconditions.checkArgument(connectorConfiguration != null, "config can not be null");
            return toBuilder().setConnectorConfiguration(connectorConfiguration).build();
        }

        public Read<T> withFormatFunction(SourceRecordMapper<T> sourceRecordMapper) {
            Preconditions.checkArgument(sourceRecordMapper != null, "mapperFn can not be null");
            return toBuilder().setFormatFunction(sourceRecordMapper).build();
        }

        public Read<T> withCoder(Coder<T> coder) {
            Preconditions.checkArgument(coder != null, "coder can not be null");
            return toBuilder().setCoder(coder).build();
        }

        public Read<T> withMaxNumberOfRecords(Integer num) {
            return toBuilder().setMaxNumberOfRecords(num).build();
        }

        public Read<T> withMaxTimeToRun(Long l) {
            return toBuilder().setMaxTimeToRun(l).build();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public Schema getRecordSchema() {
            KafkaSourceConsumerFn kafkaSourceConsumerFn = new KafkaSourceConsumerFn((Class) getConnectorConfiguration().getConnectorClass().get(), getFormatFunction(), getMaxNumberOfRecords());
            kafkaSourceConsumerFn.register(new KafkaSourceConsumerFn.OffsetTracker(new KafkaSourceConsumerFn.OffsetHolder(null, null, 0)));
            Maps.newHashMap(getConnectorConfiguration().getConfigurationMap()).put("snapshot.mode", "schema_only");
            SourceRecord oneRecord = kafkaSourceConsumerFn.getOneRecord(getConnectorConfiguration().getConfigurationMap());
            kafkaSourceConsumerFn.reset();
            return Schema.builder().addFields(KafkaConnectUtils.beamSchemaFromKafkaConnectSchema(oneRecord.valueSchema()).getFields()).setOptions(Schema.Options.builder().setOption("primaryKeyColumns", Schema.FieldType.array(Schema.FieldType.STRING), (oneRecord.keySchema() != null ? KafkaConnectUtils.beamSchemaFromKafkaConnectSchema(oneRecord.keySchema()) : Schema.builder().build()).getFields().stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toList()))).build();
        }

        public PCollection<T> expand(PBegin pBegin) {
            return pBegin.apply(Create.of(Lists.newArrayList(new Map[]{getConnectorConfiguration().getConfigurationMap()})).withCoder(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))).apply(ParDo.of(new KafkaSourceConsumerFn((Class) getConnectorConfiguration().getConnectorClass().get(), getFormatFunction(), getMaxNumberOfRecords(), getMaxTimeToRun()))).setCoder(getCoder());
        }
    }

    public static <T> Read<T> read() {
        return new AutoValue_DebeziumIO_Read.Builder().build();
    }

    public static Read<String> readAsJson() {
        return new AutoValue_DebeziumIO_Read.Builder().setFormatFunction(new SourceRecordJson.SourceRecordJsonMapper()).setCoder(StringUtf8Coder.of()).build();
    }

    private DebeziumIO() {
    }
}
