/*
 * Decompiled with CFR 0.152.
 */
package io.kestra.plugin.debezium;

import io.debezium.data.Envelope;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.plugin.debezium.AbstractDebeziumRealtimeTrigger;
import io.kestra.plugin.debezium.AbstractDebeziumTask;
import io.kestra.plugin.debezium.MapConverter;
import io.kestra.plugin.debezium.models.Envelope;
import io.kestra.plugin.debezium.models.Message;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Path;
import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.source.SourceRecord;
import reactor.core.publisher.FluxSink;

public class ChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
    private final AbstractDebeziumTask abstractDebeziumTask;
    private final RunContext runContext;
    private final AtomicInteger count;
    private final AtomicBoolean snapshot;
    private ZonedDateTime lastRecord;
    private final Map<String, Pair<File, OutputStream>> records = new HashMap<String, Pair<File, OutputStream>>();
    private final Map<String, AtomicInteger> recordsCount = new ConcurrentHashMap<String, AtomicInteger>();

    public ChangeConsumer(AbstractDebeziumTask abstractDebeziumTask, RunContext runContext, AtomicInteger count, AtomicBoolean snapshot, ZonedDateTime lastRecord) {
        this.abstractDebeziumTask = abstractDebeziumTask;
        this.runContext = runContext;
        this.count = count;
        this.snapshot = snapshot;
        this.lastRecord = lastRecord;
    }

    @Override
    public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> records, DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) {
        this.lastRecord = ZonedDateTime.now();
        for (ChangeEvent<SourceRecord, SourceRecord> r : records) {
            SourceRecord record = r.value();
            if (record.sourceOffset().containsKey("snapshot") && record.sourceOffset().get("snapshot").equals(Boolean.TRUE)) {
                this.snapshot.compareAndSet(false, true);
            } else {
                this.snapshot.compareAndSet(true, false);
            }
            Pair<Message, Message> message = MapConverter.convert(record);
            Map<String, Object> result = this.handle(message);
            if (result != null) {
                this.write(result, message.getValue().getSource());
            }
            committer.markProcessed(r);
        }
        committer.markBatchFinished();
    }

    public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> records, DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer, FluxSink<AbstractDebeziumRealtimeTrigger.StreamOutput> sink) {
        this.lastRecord = ZonedDateTime.now();
        try {
            for (ChangeEvent<SourceRecord, SourceRecord> r : records) {
                SourceRecord record = r.value();
                Pair<Message, Message> message = MapConverter.convert(record);
                Map<String, Object> result = this.handle(message);
                if (result != null) {
                    this.emit(result, message.getValue().getSource(), sink);
                }
                committer.markProcessed(r);
            }
            committer.markBatchFinished();
        }
        catch (Exception exception) {
            sink.error((Throwable)exception);
        }
    }

    private Map<String, Object> handle(Pair<Message, Message> message) {
        if (this.isFilter(message)) {
            return null;
        }
        switch (this.abstractDebeziumTask.getFormat()) {
            case RAW: {
                return this.handleFormatRaw(message);
            }
            case INLINE: {
                return this.handleFormatInline(message);
            }
            case WRAP: {
                return this.handleFormatWrap(message);
            }
        }
        throw new IllegalArgumentException("Invalid Format '" + String.valueOf((Object)this.abstractDebeziumTask.getFormat()));
    }

    private void emit(Map<String, Object> result, Message.Source source, FluxSink<AbstractDebeziumRealtimeTrigger.StreamOutput> sink) {
        String stream = switch (this.abstractDebeziumTask.getSplitTable()) {
            default -> throw new MatchException(null, null);
            case AbstractDebeziumTask.SplitTable.OFF -> "data";
            case AbstractDebeziumTask.SplitTable.TABLE -> source.getDb() + "." + source.getTable();
            case AbstractDebeziumTask.SplitTable.DATABASE -> source.getDb();
        };
        AbstractDebeziumRealtimeTrigger.StreamOutput output = AbstractDebeziumRealtimeTrigger.StreamOutput.builder().stream(stream).data(result).build();
        sink.next((Object)output);
    }

    private void write(Map<String, Object> result, Message.Source source) throws IOException {
        Object stream = switch (this.abstractDebeziumTask.getSplitTable()) {
            case AbstractDebeziumTask.SplitTable.OFF -> "data";
            case AbstractDebeziumTask.SplitTable.TABLE -> source.getDb() + "." + source.getTable();
            case AbstractDebeziumTask.SplitTable.DATABASE -> source.getDb();
            default -> throw new IllegalArgumentException("Invalid SplitTable '" + String.valueOf((Object)this.abstractDebeziumTask.getSplitTable()));
        };
        if (!this.records.containsKey(stream)) {
            Path tempFile = this.runContext.tempFile((String)stream);
            this.records.put((String)stream, Pair.of(tempFile.toFile(), new FileOutputStream(tempFile.toFile())));
        }
        this.recordsCount.computeIfAbsent((String)stream, k -> new AtomicInteger()).incrementAndGet();
        int saved = this.count.incrementAndGet();
        if (saved > 0 && saved % 5000 == 0) {
            this.runContext.logger().debug("Received {} records: {}", (Object)this.count, (Object)this.recordsCount);
        }
        FileSerde.write((OutputStream)this.records.get(stream).getRight(), result);
    }

    private boolean isFilter(Pair<Message, Message> message) {
        if (!(message.getValue() instanceof Envelope) && this.abstractDebeziumTask.getIgnoreDdl().booleanValue()) {
            return true;
        }
        if (message.getValue() == null && this.abstractDebeziumTask.getDeleted() == AbstractDebeziumTask.Deleted.DROP) {
            return true;
        }
        return !(message.getValue() instanceof Envelope) && this.abstractDebeziumTask.getFormat() != AbstractDebeziumTask.Format.RAW;
    }

    private Map<String, Object> handleFormatRaw(Pair<Message, Message> message) {
        LinkedHashMap<String, Object> result = new LinkedHashMap<String, Object>();
        result.put("key", message.getKey());
        result.put("value", message.getValue());
        this.addDeleted(result, message);
        return result;
    }

    private Map<String, Object> handleFormatInline(Pair<Message, Message> message) {
        Envelope value = (Envelope)message.getValue();
        Map<String, Object> result = this.formatInlineWithoutAdditional(value);
        this.addDeleted(result, message);
        this.addKey(result, message);
        this.addMetadata(result, value);
        return result;
    }

    private Map<String, Object> handleFormatWrap(Pair<Message, Message> message) {
        Envelope value = (Envelope)message.getValue();
        LinkedHashMap<String, Object> result = new LinkedHashMap<String, Object>();
        result.put("record", this.formatInlineWithoutAdditional(value));
        this.addDeleted(result, message);
        this.addKey(result, message);
        this.addMetadata(result, value);
        return result;
    }

    private Map<String, Object> formatInlineWithoutAdditional(Envelope value) {
        LinkedHashMap<String, Object> result = new LinkedHashMap<String, Object>();
        if (value.getOperation() == Envelope.Operation.DELETE) {
            result.putAll(value.getBefore());
        } else {
            result.putAll(value.getAfter());
        }
        return result;
    }

    private void addDeleted(Map<String, Object> result, Pair<Message, Message> message) {
        if (this.abstractDebeziumTask.getDeleted() == AbstractDebeziumTask.Deleted.ADD_FIELD && message.getValue() instanceof Envelope) {
            Envelope.Operation operation = ((Envelope)message.getValue()).getOperation();
            result.put(this.abstractDebeziumTask.getDeletedFieldName(), operation == Envelope.Operation.DELETE || operation == Envelope.Operation.TRUNCATE);
        }
    }

    private void addKey(Map<String, Object> result, Pair<Message, Message> message) {
        if (this.abstractDebeziumTask.getKey() == AbstractDebeziumTask.Key.ADD_FIELD && message.getKey() != null) {
            result.putAll(JacksonMapper.toMap((Object)message.getKey()));
        }
    }

    private void addMetadata(Map<String, Object> result, Envelope envelope) {
        if (this.abstractDebeziumTask.getMetadata() == AbstractDebeziumTask.Metadata.ADD_FIELD) {
            HashMap<String, Object> metadata = new HashMap<String, Object>();
            if (envelope.getProperties() != null) {
                metadata.putAll(envelope.getProperties());
            }
            if (envelope.getOperation() != null) {
                metadata.put("operation", (Object)envelope.getOperation());
            }
            if (envelope.getTransaction() != null) {
                metadata.put("transaction", envelope.getTransaction());
            }
            if (envelope.getSource() != null) {
                metadata.put("source", envelope.getSource());
            }
            if (envelope.getTimestamp() != null) {
                metadata.put("timestamp", envelope.getTimestamp());
            }
            result.put(this.abstractDebeziumTask.getMetadataFieldName(), metadata);
        }
    }

    @Override
    public boolean supportsTombstoneEvents() {
        return DebeziumEngine.ChangeConsumer.super.supportsTombstoneEvents();
    }

    @Generated
    public Map<String, Pair<File, OutputStream>> getRecords() {
        return this.records;
    }

    @Generated
    public Map<String, AtomicInteger> getRecordsCount() {
        return this.recordsCount;
    }
}

