package com.microsoft.semantickernel.connectors.memory.jdbc;

import com.microsoft.semantickernel.connectors.memory.jdbc.SQLConnectorException;
import com.microsoft.semantickernel.memory.MemoryException;
import java.io.Closeable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/microsoft/semantickernel/connectors/memory/jdbc/JDBCConnector.class */
public class JDBCConnector implements SQLConnector, Closeable {
    protected final Connection connection;
    protected static final String COLLECTIONS_TABLE_NAME = "SKCollectionTable";
    protected static final String TABLE_NAME = "SKMemoryTable";
    protected static final String INDEX_NAME = "SKMemoryIndex";

    /* loaded from: input_file:com/microsoft/semantickernel/connectors/memory/jdbc/JDBCConnector$BatchOperation.class */
    protected enum BatchOperation {
        SELECT,
        DELETE
    }

    public JDBCConnector(Connection connection) {
        this.connection = connection;
    }

    protected static String formatDatetime(@Nullable ZonedDateTime zonedDateTime) {
        return zonedDateTime == null ? "" : zonedDateTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
    }

    protected static ZonedDateTime parseDatetime(@Nullable String str) {
        if (str == null || str.isEmpty()) {
            return null;
        }
        return ZonedDateTime.parse(str, DateTimeFormatter.ISO_OFFSET_DATE_TIME);
    }

    protected static String DEFAULT_COLLECTIONS_TABLE_NAME() {
        return COLLECTIONS_TABLE_NAME;
    }

    protected static String DEFAULT_TABLE_NAME() {
        return TABLE_NAME;
    }

    protected static String DEFAULT_INDEX_NAME() {
        return INDEX_NAME;
    }

    @Override // com.microsoft.semantickernel.connectors.memory.jdbc.SQLConnector
    public Mono<Void> createTableAsync() {
        return Mono.fromRunnable(() -> {
            try {
                Statement createStatement = this.connection.createStatement();
                try {
                    createStatement.addBatch("CREATE TABLE IF NOT EXISTS SKCollectionTable (id TEXT PRIMARY KEY )");
                    createStatement.addBatch("CREATE TABLE IF NOT EXISTS SKMemoryTable (collection TEXT NOT NULL, key TEXT NOT NULL, metadata TEXT, embedding TEXT, timestamp TEXT, PRIMARY KEY (collection, key), FOREIGN KEY (collection) REFERENCES SKCollectionTable(id) )");
                    createStatement.addBatch("CREATE INDEX IF NOT EXISTS SKMemoryIndex ON SKMemoryTable(collection)");
                    createStatement.executeBatch();
                    if (createStatement != null) {
                        createStatement.close();
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new SQLConnectorException(SQLConnectorException.ErrorCodes.SQL_ERROR, "\"CREATE TABLE\" failed", e);
            }
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }

    @Override // com.microsoft.semantickernel.connectors.memory.jdbc.SQLConnector
    public Mono<Void> createCollectionAsync(String str) {
        return Mono.fromRunnable(() -> {
            try {
                PreparedStatement prepareStatement = this.connection.prepareStatement("INSERT INTO SKCollectionTable (id) VALUES (?)");
                try {
                    prepareStatement.setString(1, str);
                    prepareStatement.executeUpdate();
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new SQLConnectorException(SQLConnectorException.ErrorCodes.SQL_ERROR, "\"INSERT INTO\" failed", e);
            }
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }

    @Override // com.microsoft.semantickernel.connectors.memory.jdbc.SQLConnector
    public Mono<String> upsertAsync(String str, String str2, String str3, String str4, ZonedDateTime zonedDateTime) {
        return Mono.fromRunnable(() -> {
            try {
                PreparedStatement prepareStatement = this.connection.prepareStatement("INSERT OR REPLACE INTO SKMemoryTable (collection, key, metadata, embedding, timestamp) VALUES (?, ?, ?, ?, ?)");
                try {
                    prepareStatement.setString(1, str);
                    prepareStatement.setString(2, str2);
                    prepareStatement.setString(3, str3 != null ? str3 : "");
                    prepareStatement.setString(4, str4 != null ? str4 : "");
                    prepareStatement.setString(5, formatDatetime(zonedDateTime));
                    prepareStatement.executeUpdate();
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new SQLConnectorException(SQLConnectorException.ErrorCodes.SQL_ERROR, "\"INSERT OR REPLACE INTO\" failed", e);
            }
        }).subscribeOn(Schedulers.boundedElastic()).thenReturn(str2);
    }

    @Override // com.microsoft.semantickernel.connectors.memory.jdbc.SQLConnector
    public Mono<Collection<String>> upsertBatchAsync(String str, Collection<DatabaseEntry> collection) {
        ArrayList arrayList = new ArrayList();
        return Mono.fromRunnable(() -> {
            try {
                PreparedStatement prepareStatement = this.connection.prepareStatement("INSERT OR REPLACE INTO SKMemoryTable (collection, key, metadata, embedding, timestamp) VALUES (?, ?, ?, ?, ?)");
                try {
                    Iterator it = collection.iterator();
                    while (it.hasNext()) {
                        DatabaseEntry databaseEntry = (DatabaseEntry) it.next();
                        prepareStatement.setString(1, str);
                        prepareStatement.setString(2, databaseEntry.getKey());
                        prepareStatement.setString(3, databaseEntry.getMetadata() != null ? databaseEntry.getMetadata() : "");
                        prepareStatement.setString(4, databaseEntry.getEmbedding() != null ? databaseEntry.getEmbedding() : "");
                        prepareStatement.setString(5, formatDatetime(databaseEntry.getTimestamp()));
                        prepareStatement.addBatch();
                        arrayList.add(databaseEntry.getKey());
                    }
                    prepareStatement.executeBatch();
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new SQLConnectorException(SQLConnectorException.ErrorCodes.SQL_ERROR, "\"INSERT OR REPLACE INTO\" failed", e);
            }
        }).subscribeOn(Schedulers.boundedElastic()).thenReturn(arrayList);
    }

    @Override // com.microsoft.semantickernel.connectors.memory.jdbc.SQLConnector
    public Mono<Boolean> doesCollectionExistsAsync(String str) {
        return Mono.fromCallable(() -> {
            return Boolean.valueOf(doesCollectionExists(str));
        }).subscribeOn(Schedulers.boundedElastic());
    }

    private boolean doesCollectionExists(String str) throws SQLException {
        PreparedStatement prepareStatement = this.connection.prepareStatement("SELECT id FROM SKCollectionTable WHERE id = ?");
        try {
            prepareStatement.setString(1, str);
            boolean next = prepareStatement.executeQuery().next();
            if (prepareStatement != null) {
                prepareStatement.close();
            }
            return next;
        } catch (Throwable th) {
            if (prepareStatement != null) {
                try {
                    prepareStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.microsoft.semantickernel.connectors.memory.jdbc.SQLConnector
    public Mono<List<String>> getCollectionsAsync() {
        return Mono.defer(() -> {
            ArrayList arrayList = new ArrayList();
            try {
                PreparedStatement prepareStatement = this.connection.prepareStatement("SELECT id FROM SKCollectionTable");
                try {
                    ResultSet executeQuery = prepareStatement.executeQuery();
                    while (executeQuery.next()) {
                        arrayList.add(executeQuery.getString("id"));
                    }
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    return Mono.just(arrayList);
                } finally {
                }
            } catch (SQLException e) {
                return Mono.error(new SQLConnectorException(SQLConnectorException.ErrorCodes.SQL_ERROR, "\"SELECT\" failed", e));
            }
        }).subscribeOn(Schedulers.boundedElastic());
    }

    @Override // com.microsoft.semantickernel.connectors.memory.jdbc.SQLConnector
    public Mono<List<DatabaseEntry>> readAllAsync(String str) {
        return Mono.defer(() -> {
            ArrayList arrayList = new ArrayList();
            try {
                PreparedStatement prepareStatement = this.connection.prepareStatement("SELECT * FROM SKMemoryTable WHERE collection = ?");
                try {
                    prepareStatement.setString(1, str);
                    ResultSet executeQuery = prepareStatement.executeQuery();
                    while (executeQuery.next()) {
                        arrayList.add(new DatabaseEntry(executeQuery.getString("key"), executeQuery.getString("metadata"), executeQuery.getString("embedding"), parseDatetime(executeQuery.getString("timestamp"))));
                    }
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    return Mono.just(arrayList);
                } finally {
                }
            } catch (SQLException e) {
                return Mono.error(new SQLConnectorException(SQLConnectorException.ErrorCodes.SQL_ERROR, "\"SELECT * FROM\" failed", e));
            }
        }).subscribeOn(Schedulers.boundedElastic());
    }

    @Override // com.microsoft.semantickernel.connectors.memory.jdbc.SQLConnector
    public Mono<DatabaseEntry> readAsync(String str, String str2) {
        return Mono.defer(() -> {
            try {
                PreparedStatement prepareStatement = this.connection.prepareStatement("SELECT * FROM SKMemoryTable WHERE collection = ? AND key = ?");
                try {
                    prepareStatement.setString(1, str);
                    prepareStatement.setString(2, (str2 == null || str2.isEmpty()) ? null : str2);
                    ResultSet executeQuery = prepareStatement.executeQuery();
                    if (!executeQuery.next()) {
                        if (prepareStatement != null) {
                            prepareStatement.close();
                        }
                        return Mono.empty();
                    }
                    Mono just = Mono.just(new DatabaseEntry(str2, executeQuery.getString("metadata"), executeQuery.getString("embedding"), parseDatetime(executeQuery.getString("timestamp"))));
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    return just;
                } finally {
                }
            } catch (SQLException e) {
                return Mono.error(new SQLConnectorException(SQLConnectorException.ErrorCodes.SQL_ERROR, "\"SELECT * FROM\" failed", e));
            }
        }).subscribeOn(Schedulers.boundedElastic());
    }

    protected String batchQuery(BatchOperation batchOperation, Collection<String> collection) {
        String str;
        switch (batchOperation) {
            case SELECT:
                str = "SELECT * FROM SKMemoryTable WHERE collection = ?";
                break;
            case DELETE:
                str = "DELETE FROM SKMemoryTable WHERE collection = ?";
                break;
            default:
                throw new IllegalArgumentException("Invalid batch operation");
        }
        StringBuilder sb = new StringBuilder(str);
        sb.append(" AND key IN (");
        for (int i = 0; i < collection.size(); i++) {
            sb.append("?");
            if (i < collection.size() - 1) {
                sb.append(",");
            }
        }
        sb.append(")");
        return sb.toString();
    }

    @Override // com.microsoft.semantickernel.connectors.memory.jdbc.SQLConnector
    public Mono<Collection<DatabaseEntry>> readBatchAsync(String str, Collection<String> collection) {
        return Mono.defer(() -> {
            ArrayList arrayList = new ArrayList();
            try {
                PreparedStatement prepareStatement = this.connection.prepareStatement(batchQuery(BatchOperation.SELECT, collection));
                try {
                    prepareStatement.setString(1, str);
                    int i = 2;
                    Iterator it = collection.iterator();
                    while (it.hasNext()) {
                        int i2 = i;
                        i++;
                        prepareStatement.setString(i2, (String) it.next());
                    }
                    ResultSet executeQuery = prepareStatement.executeQuery();
                    while (executeQuery.next()) {
                        arrayList.add(new DatabaseEntry(executeQuery.getString("key"), executeQuery.getString("metadata"), executeQuery.getString("embedding"), parseDatetime(executeQuery.getString("timestamp"))));
                    }
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    return Mono.just(arrayList);
                } finally {
                }
            } catch (SQLException e) {
                return Mono.error(new SQLConnectorException(SQLConnectorException.ErrorCodes.SQL_ERROR, "\"SELECT * FROM\" failed", e));
            }
        }).subscribeOn(Schedulers.boundedElastic());
    }

    @Override // com.microsoft.semantickernel.connectors.memory.jdbc.SQLConnector
    public Mono<Void> deleteCollectionAsync(String str) {
        return Mono.fromRunnable(() -> {
            try {
                PreparedStatement prepareStatement = this.connection.prepareStatement("DELETE FROM SKMemoryTable WHERE collection = ?");
                try {
                    PreparedStatement prepareStatement2 = this.connection.prepareStatement("DELETE FROM SKCollectionTable WHERE id = ?");
                    try {
                        prepareStatement.setString(1, str);
                        prepareStatement.executeUpdate();
                        prepareStatement2.setString(1, str);
                        if (prepareStatement2.executeUpdate() == 0) {
                            throw new MemoryException(MemoryException.ErrorCodes.ATTEMPTED_TO_ACCESS_NONEXISTENT_COLLECTION);
                        }
                        if (prepareStatement2 != null) {
                            prepareStatement2.close();
                        }
                        if (prepareStatement != null) {
                            prepareStatement.close();
                        }
                    } catch (Throwable th) {
                        if (prepareStatement2 != null) {
                            try {
                                prepareStatement2.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new SQLConnectorException(SQLConnectorException.ErrorCodes.SQL_ERROR, "\"DELETE FROM\" failed", e);
            }
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }

    @Override // com.microsoft.semantickernel.connectors.memory.jdbc.SQLConnector
    public Mono<Void> deleteAsync(String str, String str2) {
        return Mono.fromRunnable(() -> {
            try {
                PreparedStatement prepareStatement = this.connection.prepareStatement("DELETE FROM SKMemoryTable WHERE collection = ? AND key = ?");
                try {
                    prepareStatement.setString(1, str);
                    prepareStatement.setString(2, (str2 == null || str2.isEmpty()) ? null : str2);
                    prepareStatement.executeUpdate();
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new SQLConnectorException(SQLConnectorException.ErrorCodes.SQL_ERROR, "\"DELETE FROM\" failed", e);
            }
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }

    @Override // com.microsoft.semantickernel.connectors.memory.jdbc.SQLConnector
    public Mono<Void> deleteBatchAsync(String str, Collection<String> collection) {
        return Mono.fromRunnable(() -> {
            try {
                PreparedStatement prepareStatement = this.connection.prepareStatement(batchQuery(BatchOperation.DELETE, collection));
                try {
                    prepareStatement.setString(1, str);
                    int i = 2;
                    Iterator it = collection.iterator();
                    while (it.hasNext()) {
                        int i2 = i;
                        i++;
                        prepareStatement.setString(i2, (String) it.next());
                    }
                    prepareStatement.executeUpdate();
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new SQLConnectorException(SQLConnectorException.ErrorCodes.SQL_ERROR, "\"DELETE FROM\" failed", e);
            }
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }

    @Override // com.microsoft.semantickernel.connectors.memory.jdbc.SQLConnector
    public Mono<Void> deleteEmptyAsync(String str) {
        return Mono.fromRunnable(() -> {
            try {
                PreparedStatement prepareStatement = this.connection.prepareStatement("DELETE FROM SKMemoryTable WHERE collection = ? AND key is NULL");
                try {
                    prepareStatement.setString(1, str);
                    prepareStatement.executeUpdate();
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new SQLConnectorException(SQLConnectorException.ErrorCodes.SQL_ERROR, "\"DELETE FROM\" failed", e);
            }
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.connection.close();
        } catch (SQLException e) {
            throw new SQLConnectorException(SQLConnectorException.ErrorCodes.SQL_ERROR, "Database access error while closing connection", e);
        }
    }
}
