package com.github.kuliginstepan.outbox.relational.autoconfigure;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.kuliginstepan.outbox.core.OutboxEntity;
import com.github.kuliginstepan.outbox.core.OutboxMethodIdentifier;
import com.github.kuliginstepan.outbox.core.ReactiveOutboxRepository;
import com.github.kuliginstepan.outbox.relational.autoconfigure.ExtendedRelationalOutboxProperties;
import java.time.Instant;
import java.util.Arrays;
import java.util.stream.Collectors;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/github/kuliginstepan/outbox/relational/autoconfigure/R2dbcOutboxRepository.class */
public class R2dbcOutboxRepository implements ReactiveOutboxRepository {
    private static final String INSERT_QUERY = "INSERT INTO %TABLE_NAME% (ID, PUBLICATION_DATE, OUTBOX_METHOD_ID, DATA_TYPE, DATA) VALUES (:id, :publicationDate, :outboxMethodId, :dataType, :data)";
    private static final String MARK_COMPLETED_QUERY = "UPDATE %TABLE_NAME% SET COMPLETION_DATE=:completionDate WHERE ID=:id AND COMPLETION_DATE IS NULL";
    private static final String DELETE_COMPLETED_QUERY = "DELETE FROM %TABLE_NAME% WHERE COMPLETION_DATE IS NOT NULL AND PUBLICATION_DATE <= :publicationDate";
    private static final String SELECT_UNCOMPLETED_QUERY = "SELECT * FROM %TABLE_NAME% WHERE COMPLETION_DATE IS NULL AND PUBLICATION_DATE <= :publicationDate";
    private final DatabaseClient client;
    private final ObjectMapper mapper;
    private final String insertQuery;
    private final String markCompletedQuery;
    private final String deleteCompletedQuery;
    private final String selectUncompletedQuery;

    public R2dbcOutboxRepository(DatabaseClient databaseClient, ObjectMapper objectMapper, ExtendedRelationalOutboxProperties.RelationalOutboxProperties relationalOutboxProperties) {
        this.client = databaseClient;
        this.mapper = objectMapper;
        this.insertQuery = getQuery(INSERT_QUERY, relationalOutboxProperties.getTable());
        this.markCompletedQuery = getQuery(MARK_COMPLETED_QUERY, relationalOutboxProperties.getTable());
        this.deleteCompletedQuery = getQuery(DELETE_COMPLETED_QUERY, relationalOutboxProperties.getTable());
        this.selectUncompletedQuery = getQuery(SELECT_UNCOMPLETED_QUERY, relationalOutboxProperties.getTable());
    }

    public Mono<Void> save(OutboxEntity outboxEntity) {
        return this.client.sql(this.insertQuery).bind("id", outboxEntity.getId()).bind("outboxMethodId", outboxEntity.getMethodIdentifier().getValue()).bind("publicationDate", Long.valueOf(outboxEntity.getPublicationDate().toEpochMilli())).bind("data", serialize(outboxEntity.getData())).bind("dataType", Arrays.stream(outboxEntity.getData()).map((v0) -> {
            return v0.getClass();
        }).map(ClassUtils::getQualifiedName).collect(Collectors.joining(","))).then();
    }

    public Mono<Void> markCompleted(OutboxEntity outboxEntity) {
        return this.client.sql(this.markCompletedQuery).bind("completionDate", Long.valueOf(Instant.now().toEpochMilli())).bind("id", outboxEntity.getId()).then();
    }

    public Mono<Void> deleteCompletedEntities(Instant instant) {
        return this.client.sql(this.deleteCompletedQuery).bind("publicationDate", Long.valueOf(instant.toEpochMilli())).then();
    }

    public Flux<OutboxEntity> findUncompletedEntities(Instant instant) {
        return this.client.sql(this.selectUncompletedQuery).bind("publicationDate", Long.valueOf(instant.toEpochMilli())).map(row -> {
            return new OutboxEntity() { // from class: com.github.kuliginstepan.outbox.relational.autoconfigure.R2dbcOutboxRepository.1
                public String getId() {
                    return (String) row.get("ID", String.class);
                }

                public Instant getPublicationDate() {
                    return Instant.ofEpochMilli(((Long) row.get("PUBLICATION_DATE", Long.class)).longValue());
                }

                public OutboxMethodIdentifier getMethodIdentifier() {
                    return OutboxMethodIdentifier.of((String) row.get("OUTBOX_METHOD_ID", String.class));
                }

                public Object[] getData() {
                    Class[] clsArr = (Class[]) Arrays.stream(((String) row.get("DATA_TYPE", String.class)).split(",")).map(str -> {
                        return ClassUtils.resolveClassName(str, (ClassLoader) null);
                    }).toArray(i -> {
                        return new Class[i];
                    });
                    Object[] objArr = (Object[]) R2dbcOutboxRepository.this.deserialize((String) row.get("DATA", String.class), Object[].class);
                    for (int i2 = 0; i2 < objArr.length; i2++) {
                        objArr[i2] = R2dbcOutboxRepository.this.mapper.convertValue(objArr[i2], clsArr[i2]);
                    }
                    return objArr;
                }
            };
        }).all();
    }

    private String serialize(Object obj) {
        try {
            return this.mapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> T deserialize(String str, Class<T> cls) {
        try {
            return (T) this.mapper.readValue(str, cls);
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private String getQuery(String str, String str2) {
        return StringUtils.replace(str, "%TABLE_NAME%", str2);
    }
}
