package net.andreaskluth.session.postgres;

import io.reactiverse.pgclient.PgPool;
import io.reactiverse.pgclient.PgRowSet;
import io.reactiverse.pgclient.Row;
import io.reactiverse.pgclient.Tuple;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import net.andreaskluth.session.postgres.serializer.SerializationStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.session.ReactiveSessionRepository;
import org.springframework.session.Session;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.SynchronousSink;

/* loaded from: input_file:net/andreaskluth/session/postgres/ReactivePostgresSessionRepository.class */
public class ReactivePostgresSessionRepository implements ReactiveSessionRepository<PostgresSession> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactivePostgresSessionRepository.class);
    private static final String SEQUENCE_DEFAULT_NAME = "ReactivePostgresSessionRepository";
    private final PgPool pgPool;
    private final SerializationStrategy serializationStrategy;
    private final Clock clock;
    private boolean enableMetrics = false;
    private String sequenceName = SEQUENCE_DEFAULT_NAME;
    private Duration defaultMaxInactiveInterval = Duration.ofSeconds(1800);

    /* loaded from: input_file:net/andreaskluth/session/postgres/ReactivePostgresSessionRepository$MonoToVertxHandlerAdapter.class */
    private static class MonoToVertxHandlerAdapter<T> implements Handler<AsyncResult<T>> {
        private final MonoSink<T> sink;

        private MonoToVertxHandlerAdapter(MonoSink<T> monoSink) {
            this.sink = (MonoSink) Objects.requireNonNull(monoSink, "sink must not be null");
        }

        public void handle(AsyncResult<T> asyncResult) {
            if (asyncResult.succeeded()) {
                this.sink.success(asyncResult.result());
            } else {
                this.sink.error(asyncResult.cause());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:net/andreaskluth/session/postgres/ReactivePostgresSessionRepository$PostgresSession.class */
    public static class PostgresSession implements Session {
        private final UUID internalPrimaryKey;
        private final Map<String, Object> sessionData;
        private final Clock clock;
        private String sessionId;
        private boolean isNew;
        private boolean changed;
        private Instant lastAccessedTime;
        private Instant creationTime;
        private Duration maxInactiveInterval;

        PostgresSession(Clock clock, Duration duration) {
            this.changed = true;
            this.clock = clock;
            this.internalPrimaryKey = UUID.randomUUID();
            this.sessionId = UUID.randomUUID().toString();
            this.sessionData = new HashMap();
            this.maxInactiveInterval = duration;
            this.creationTime = clock.instant();
            this.lastAccessedTime = clock.instant();
            this.isNew = true;
        }

        PostgresSession(Clock clock, UUID uuid, String str, Map<String, Object> map, Instant instant, Instant instant2, Duration duration) {
            this.changed = true;
            this.clock = clock;
            this.internalPrimaryKey = uuid;
            this.sessionId = str;
            this.sessionData = map;
            this.creationTime = instant;
            this.lastAccessedTime = instant2;
            this.maxInactiveInterval = duration;
            this.isNew = false;
        }

        public String getId() {
            return this.sessionId;
        }

        public String changeSessionId() {
            this.changed = true;
            this.sessionId = UUID.randomUUID().toString();
            return this.sessionId;
        }

        public <T> T getAttribute(String str) {
            return (T) this.sessionData.get(str);
        }

        public Set<String> getAttributeNames() {
            return this.sessionData.keySet();
        }

        public void setAttribute(String str, Object obj) {
            this.changed = true;
            this.sessionData.put(str, obj);
        }

        public void removeAttribute(String str) {
            this.changed = true;
            this.sessionData.remove(str);
        }

        public Instant getCreationTime() {
            return this.creationTime;
        }

        public void setLastAccessedTime(Instant instant) {
            Objects.requireNonNull(instant, "lastAccessedTime must not be null");
            this.lastAccessedTime = instant;
        }

        public Instant getLastAccessedTime() {
            return this.lastAccessedTime;
        }

        public void setMaxInactiveInterval(Duration duration) {
            Objects.requireNonNull(duration, "maxInactiveInterval must not be null");
            this.maxInactiveInterval = duration;
        }

        public Duration getMaxInactiveInterval() {
            return this.maxInactiveInterval;
        }

        public boolean isExpired() {
            return isExpired(this.clock.instant());
        }

        void clearChangeFlags() {
            this.isNew = false;
            this.changed = false;
        }

        boolean isChanged() {
            return this.changed;
        }

        Instant getExpiryTime() {
            return getLastAccessedTime().plus((TemporalAmount) getMaxInactiveInterval());
        }

        boolean isExpired(Instant instant) {
            return !this.maxInactiveInterval.isNegative() && instant.minus((TemporalAmount) this.maxInactiveInterval).compareTo(this.lastAccessedTime) >= 0;
        }
    }

    public ReactivePostgresSessionRepository(PgPool pgPool, SerializationStrategy serializationStrategy, Clock clock) {
        this.pgPool = (PgPool) Objects.requireNonNull(pgPool, "pgPool must not be null");
        this.serializationStrategy = (SerializationStrategy) Objects.requireNonNull(serializationStrategy, "serializationStrategy must not be null");
        this.clock = (Clock) Objects.requireNonNull(clock, "clock must not be null");
    }

    public void withMetrics(boolean z) {
        this.enableMetrics = z;
    }

    public void setSequenceName(String str) {
        this.sequenceName = str;
    }

    public void setDefaultMaxInactiveInterval(int i) {
        this.defaultMaxInactiveInterval = Duration.ofSeconds(i);
    }

    public Mono<PostgresSession> createSession() {
        return (Mono) Mono.defer(() -> {
            return Mono.just(new PostgresSession(this.clock, this.defaultMaxInactiveInterval));
        }).as(addMetricsIfEnabled("createSession"));
    }

    public Mono<Void> save(PostgresSession postgresSession) {
        Objects.requireNonNull(postgresSession, "postgresSession must not be null");
        return (Mono) Mono.defer(() -> {
            return postgresSession.isNew ? insertSession(postgresSession) : updateSession(postgresSession);
        }).as(addMetricsIfEnabled("save"));
    }

    private Mono<Void> insertSession(PostgresSession postgresSession) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Insert new session with id: {}", postgresSession.sessionId);
        }
        return insertSessionCore(postgresSession).handle((pgRowSet, synchronousSink) -> {
            handleInsertOrUpdate(postgresSession, pgRowSet, synchronousSink);
        }).then();
    }

    private Mono<Void> updateSession(PostgresSession postgresSession) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Update changed session with id: {} updates session data: {}", postgresSession.sessionId, Boolean.valueOf(postgresSession.changed));
        }
        return updateSessionCore(postgresSession).handle((pgRowSet, synchronousSink) -> {
            handleInsertOrUpdate(postgresSession, pgRowSet, synchronousSink);
        }).then();
    }

    private Mono<PgRowSet> insertSessionCore(PostgresSession postgresSession) {
        return preparedQuery(ReactivePostgresSessionRepositoryQueries.INSERT_SQL, buildParametersForInsert(postgresSession));
    }

    private Mono<PgRowSet> updateSessionCore(PostgresSession postgresSession) {
        return postgresSession.isChanged() ? preparedQuery(ReactivePostgresSessionRepositoryQueries.UPDATE_SQL, buildParametersForUpdate(postgresSession)) : preparedQuery(ReactivePostgresSessionRepositoryQueries.REDUCED_UPDATE_SQL, buildReducedParametersForUpdate(postgresSession));
    }

    private void handleInsertOrUpdate(PostgresSession postgresSession, PgRowSet pgRowSet, SynchronousSink<Object> synchronousSink) {
        if (pgRowSet.rowCount() != 1) {
            synchronousSink.error(new ReactivePostgresSessionException("SQLStatement did not return the expected row count of 1, did return " + pgRowSet.rowCount() + " inserted/updated records."));
        } else {
            postgresSession.clearChangeFlags();
            synchronousSink.complete();
        }
    }

    public Mono<PostgresSession> findById(String str) {
        Objects.requireNonNull(str, "id must not be null");
        return (Mono) preparedQuery(ReactivePostgresSessionRepositoryQueries.SELECT_SQL, Tuple.of(str)).flatMap(pgRowSet -> {
            return Mono.justOrEmpty(mapRowSetToPostgresSession(pgRowSet));
        }).filter(postgresSession -> {
            return !postgresSession.isExpired();
        }).as(addMetricsIfEnabled("findById"));
    }

    private PostgresSession mapRowSetToPostgresSession(PgRowSet pgRowSet) {
        if (pgRowSet.rowCount() >= 1) {
            return mapRowToPostgresSession(pgRowSet.iterator().next());
        }
        return null;
    }

    private PostgresSession mapRowToPostgresSession(Row row) {
        return new PostgresSession(this.clock, row.getUUID("id"), row.getString("session_id"), byteBufferAsSessionData(row.getBuffer("session_data")), Instant.ofEpochMilli(row.getLong("creation_time").longValue()), Instant.ofEpochMilli(row.getLong("last_accessed_time").longValue()), Duration.ofSeconds(row.getInteger("max_inactive_interval").intValue()));
    }

    public Mono<Void> deleteById(String str) {
        Objects.requireNonNull(str, "id must not be null");
        return (Mono) Mono.defer(() -> {
            return preparedQuery(ReactivePostgresSessionRepositoryQueries.DELETE_FROM_SESSION_SQL, Tuple.of(str)).then();
        }).as(addMetricsIfEnabled("deleteById"));
    }

    public Mono<Integer> cleanupExpiredSessions() {
        return (Mono) Mono.defer(() -> {
            return preparedQuery(ReactivePostgresSessionRepositoryQueries.DELETE_EXPIRED_SESSIONS_SQL, Tuple.of(Long.valueOf(this.clock.millis()))).map((v0) -> {
                return v0.rowCount();
            });
        }).as(addMetricsIfEnabled("cleanupExpiredSessions"));
    }

    private <T> Function<Mono<T>, Mono<T>> addMetricsIfEnabled(String str) {
        return mono -> {
            return this.enableMetrics ? mono.metrics().name(this.sequenceName).tag("method", str) : mono;
        };
    }

    private Mono<PgRowSet> preparedQuery(String str, Tuple tuple) {
        return Mono.create(monoSink -> {
            this.pgPool.preparedQuery(str, tuple, new MonoToVertxHandlerAdapter(monoSink));
        });
    }

    private Tuple buildParametersForInsert(PostgresSession postgresSession) {
        return Tuple.of(postgresSession.internalPrimaryKey, new Object[]{postgresSession.getId(), sessionDataAsByteBuffer(postgresSession.sessionData), Long.valueOf(postgresSession.getCreationTime().toEpochMilli()), Long.valueOf(postgresSession.getLastAccessedTime().toEpochMilli()), Long.valueOf(postgresSession.getExpiryTime().toEpochMilli()), Integer.valueOf((int) postgresSession.getMaxInactiveInterval().getSeconds())});
    }

    private Tuple buildParametersForUpdate(PostgresSession postgresSession) {
        return Tuple.of(postgresSession.internalPrimaryKey, postgresSession.getId(), sessionDataAsByteBuffer(postgresSession.sessionData), Long.valueOf(postgresSession.getLastAccessedTime().toEpochMilli()), Long.valueOf(postgresSession.getExpiryTime().toEpochMilli()), Integer.valueOf((int) postgresSession.getMaxInactiveInterval().getSeconds()));
    }

    private Tuple buildReducedParametersForUpdate(PostgresSession postgresSession) {
        return Tuple.of(postgresSession.internalPrimaryKey, postgresSession.getId(), Long.valueOf(postgresSession.getLastAccessedTime().toEpochMilli()), Long.valueOf(postgresSession.getExpiryTime().toEpochMilli()), Integer.valueOf((int) postgresSession.getMaxInactiveInterval().getSeconds()));
    }

    private Buffer sessionDataAsByteBuffer(Map<String, Object> map) {
        if (map.isEmpty()) {
            return null;
        }
        return Buffer.buffer(this.serializationStrategy.serialize(map));
    }

    private Map<String, Object> byteBufferAsSessionData(Buffer buffer) {
        return buffer == null ? new HashMap() : this.serializationStrategy.deserialize(buffer.getBytes());
    }
}
