/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.Converter;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Avro;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.CloudEvents;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.KeyValueChangeEventFormat;
import io.debezium.engine.format.Protobuf;
import io.debezium.engine.format.SerializationFormat;
import io.debezium.engine.spi.OffsetCommitPolicy;
import java.io.IOException;
import java.time.Clock;
import java.util.List;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

public class ConvertingEngineBuilder<R>
implements DebeziumEngine.Builder<R> {
    private static final String CONVERTER_PREFIX = "converter";
    private static final String KEY_CONVERTER_PREFIX = "key.converter";
    private static final String VALUE_CONVERTER_PREFIX = "value.converter";
    private static final String FIELD_CLASS = "class";
    private static final String TOPIC_NAME = "debezium";
    private static final String APICURIO_SCHEMA_REGISTRY_URL_CONFIG = "apicurio.registry.url";
    private final DebeziumEngine.Builder<SourceRecord> delegate = EmbeddedEngine.create();
    private final Class<? extends SerializationFormat<?>> formatKey;
    private final Class<? extends SerializationFormat<?>> formatValue;
    private Configuration config;
    private Function<SourceRecord, R> toFormat;
    private Function<R, SourceRecord> fromFormat;

    ConvertingEngineBuilder(ChangeEventFormat<?> format) {
        this.formatKey = null;
        this.formatValue = format.getValueFormat();
    }

    ConvertingEngineBuilder(KeyValueChangeEventFormat<?, ?> format) {
        this.formatKey = format.getKeyFormat();
        this.formatValue = format.getValueFormat();
    }

    @Override
    public DebeziumEngine.Builder<R> notifying(Consumer<R> consumer) {
        this.delegate.notifying((R record) -> consumer.accept(this.toFormat.apply((SourceRecord)record)));
        return this;
    }

    private boolean isFormat(Class<? extends SerializationFormat<?>> format1, Class<? extends SerializationFormat<?>> format2) {
        return format1 == format2;
    }

    @Override
    public DebeziumEngine.Builder<R> notifying(DebeziumEngine.ChangeConsumer<R> handler) {
        this.delegate.notifying((List<R> records, DebeziumEngine.RecordCommitter<R> committer) -> handler.handleBatch(records.stream().map(x -> this.toFormat.apply((SourceRecord)x)).collect(Collectors.toList()), new DebeziumEngine.RecordCommitter<R>(){

            @Override
            public void markProcessed(R record) throws InterruptedException {
                committer.markProcessed(ConvertingEngineBuilder.this.fromFormat.apply(record));
            }

            @Override
            public void markBatchFinished() throws InterruptedException {
                committer.markBatchFinished();
            }

            @Override
            public void markProcessed(R record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException {
                committer.markProcessed(ConvertingEngineBuilder.this.fromFormat.apply(record), sourceOffsets);
            }

            @Override
            public DebeziumEngine.Offsets buildOffsets() {
                return committer.buildOffsets();
            }
        }));
        return this;
    }

    @Override
    public DebeziumEngine.Builder<R> using(Properties config) {
        this.config = Configuration.from(config);
        this.delegate.using(config);
        return this;
    }

    @Override
    public DebeziumEngine.Builder<R> using(ClassLoader classLoader) {
        this.delegate.using(classLoader);
        return this;
    }

    @Override
    public DebeziumEngine.Builder<R> using(Clock clock) {
        this.delegate.using(clock);
        return this;
    }

    @Override
    public DebeziumEngine.Builder<R> using(DebeziumEngine.CompletionCallback completionCallback) {
        this.delegate.using(completionCallback);
        return this;
    }

    @Override
    public DebeziumEngine.Builder<R> using(DebeziumEngine.ConnectorCallback connectorCallback) {
        this.delegate.using(connectorCallback);
        return this;
    }

    @Override
    public DebeziumEngine.Builder<R> using(OffsetCommitPolicy policy) {
        this.delegate.using(policy);
        return this;
    }

    @Override
    public DebeziumEngine<R> build() {
        final DebeziumEngine<SourceRecord> engine = this.delegate.build();
        if (this.formatValue == Connect.class) {
            this.toFormat = record -> new EmbeddedEngineChangeEvent<Object, SourceRecord>(null, (SourceRecord)record, (SourceRecord)record);
        } else {
            Converter keyConverter = this.createConverter(this.formatKey, true);
            Converter valueConverter = this.createConverter(this.formatValue, false);
            this.toFormat = record -> {
                byte[] key = keyConverter.fromConnectData(TOPIC_NAME, record.keySchema(), record.key());
                byte[] value = valueConverter.fromConnectData(TOPIC_NAME, record.valueSchema(), record.value());
                return this.isFormat(this.formatKey, Json.class) && this.isFormat(this.formatValue, Json.class) || this.isFormat(this.formatValue, CloudEvents.class) ? new EmbeddedEngineChangeEvent<String, String>(key != null ? new String(key) : null, value != null ? new String(value) : null, (SourceRecord)record) : new EmbeddedEngineChangeEvent<byte[], byte[]>(key, value, (SourceRecord)record);
            };
        }
        this.fromFormat = record -> ((EmbeddedEngineChangeEvent)record).sourceRecord();
        return new DebeziumEngine<R>(){

            @Override
            public void run() {
                engine.run();
            }

            @Override
            public void close() throws IOException {
                engine.close();
            }
        };
    }

    private Converter createConverter(Class<? extends SerializationFormat<?>> format, boolean key) {
        Configuration converterConfig = this.config.subset(key ? KEY_CONVERTER_PREFIX : VALUE_CONVERTER_PREFIX, true);
        Configuration commonConverterConfig = this.config.subset(CONVERTER_PREFIX, true);
        converterConfig = ((Configuration.Builder)commonConverterConfig.edit().with(converterConfig)).build();
        if (this.isFormat(format, Json.class)) {
            converterConfig = converterConfig.hasKey(APICURIO_SCHEMA_REGISTRY_URL_CONFIG) ? converterConfig.edit().withDefault(FIELD_CLASS, "io.apicurio.registry.utils.converter.ExtJsonConverter").build() : converterConfig.edit().withDefault(FIELD_CLASS, "com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter").build();
        } else if (this.isFormat(format, CloudEvents.class)) {
            converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "io.debezium.converters.CloudEventsConverter").build();
        } else if (this.isFormat(format, Avro.class)) {
            converterConfig = converterConfig.hasKey(APICURIO_SCHEMA_REGISTRY_URL_CONFIG) ? converterConfig.edit().withDefault(FIELD_CLASS, "io.apicurio.registry.utils.converter.AvroConverter").build() : converterConfig.edit().withDefault(FIELD_CLASS, "io.confluent.connect.avro.AvroConverter").build();
        } else if (this.isFormat(format, Protobuf.class)) {
            converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "io.confluent.connect.protobuf.ProtobufConverter").build();
        } else {
            throw new DebeziumException("Converter '" + format.getSimpleName() + "' is not supported");
        }
        Converter converter = converterConfig.getInstance(FIELD_CLASS, Converter.class);
        converter.configure(converterConfig.asMap(), key);
        return converter;
    }
}

