/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.sqs;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Named;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsClientBuilder;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

@Named(value="sqs")
@Dependent
public class SqsChangeConsumer
extends BaseChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    protected static final String PROP_PREFIX = "debezium.sink.sqs.";
    protected static final String PROP_REGION_NAME = "debezium.sink.sqs.region";
    private static final Logger LOGGER = LoggerFactory.getLogger(SqsChangeConsumer.class);
    private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1L);
    private static final int DEFAULT_RETRIES = 5;
    private static final String PROP_ENDPOINT_NAME = "debezium.sink.sqs.endpoint";
    private static final String PROP_QUEUE_URL = "debezium.sink.sqs.queue.url";
    private static final String PROP_CREDENTIALS_PROFILE = "debezium.sink.sqs.credentials.profile";
    private static final String PROP_QUEUE_FIFO_MESSAGE_GROUP_ID = "debezium.sink.sqs.fifo.message.group.id";
    private String messageGroupId = null;
    private String queueUrl;
    private SqsClient client = null;

    @PostConstruct
    void connect() {
        Config config = ConfigProvider.getConfig();
        SqsClientBuilder builder = (SqsClientBuilder)SqsClient.builder().region(Region.of((String)((String)config.getValue(PROP_REGION_NAME, String.class))));
        config.getOptionalValue(PROP_ENDPOINT_NAME, String.class).ifPresent(endpoint -> {
            LOGGER.info("Queue Endpoint {}", endpoint);
            builder.endpointOverride(URI.create(endpoint));
        });
        config.getOptionalValue(PROP_CREDENTIALS_PROFILE, String.class).ifPresent(profile -> {
            LOGGER.info("Credentials profile {}", profile);
            builder.credentialsProvider((AwsCredentialsProvider)ProfileCredentialsProvider.create((String)profile));
        });
        this.client = (SqsClient)builder.build();
        this.queueUrl = (String)config.getValue(PROP_QUEUE_URL, String.class);
        LOGGER.info("Queue Url {}", (Object)this.queueUrl);
        if (this.queueUrl.endsWith(".fifo")) {
            this.messageGroupId = config.getOptionalValue(PROP_QUEUE_FIFO_MESSAGE_GROUP_ID, String.class).orElse("cdc-group");
        }
    }

    @PreDestroy
    void close() {
        try {
            this.client.close();
        }
        catch (Exception e) {
            LOGGER.warn("Exception while closing Sqs client", (Throwable)e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer) throws InterruptedException {
        for (ChangeEvent<Object, Object> record : records) {
            LOGGER.trace("Received event '{}'", record);
            int attempts = 0;
            while (!this.recordSent(record)) {
                if (++attempts >= 5) {
                    throw new DebeziumException("Exceeded maximum number of attempts to publish event " + String.valueOf(record));
                }
                Metronome.sleeper((Duration)RETRY_INTERVAL, (Clock)Clock.SYSTEM).pause();
            }
            committer.markProcessed(record);
        }
        committer.markBatchFinished();
    }

    private boolean recordSent(ChangeEvent<Object, Object> event) {
        Object eventValue = event.value();
        if (eventValue == null) {
            eventValue = "";
        }
        LOGGER.info(event.toString());
        SendMessageRequest.Builder sendMessageRequestBuilder = SendMessageRequest.builder().queueUrl(this.queueUrl).messageBody(eventValue.toString());
        if (this.messageGroupId != null) {
            sendMessageRequestBuilder.messageGroupId(this.messageGroupId);
        }
        try {
            this.client.sendMessage((SendMessageRequest)sendMessageRequestBuilder.build());
            return true;
        }
        catch (SdkClientException exception) {
            LOGGER.error("Failed to send record to {}", (Object)event.destination(), (Object)exception);
            return false;
        }
    }
}

