package org.oracle.okafka.clients.admin.internals;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.oracle.okafka.clients.ClientRequest;
import org.oracle.okafka.clients.ClientResponse;
import org.oracle.okafka.clients.admin.AdminClientConfig;
import org.oracle.okafka.common.Node;
import org.oracle.okafka.common.errors.ConnectionException;
import org.oracle.okafka.common.errors.InvalidLoginCredentialsException;
import org.oracle.okafka.common.network.AQClient;
import org.oracle.okafka.common.protocol.ApiKeys;
import org.oracle.okafka.common.requests.CreateTopicsRequest;
import org.oracle.okafka.common.requests.CreateTopicsResponse;
import org.oracle.okafka.common.requests.DeleteTopicsRequest;
import org.oracle.okafka.common.requests.DeleteTopicsResponse;
import org.oracle.okafka.common.utils.ConnectionUtils;
import org.oracle.okafka.common.utils.CreateTopics;
import org.oracle.okafka.common.utils.LogContext;
import org.oracle.okafka.common.utils.Time;

/* loaded from: input_file:org/oracle/okafka/clients/admin/internals/AQKafkaAdmin.class */
public class AQKafkaAdmin extends AQClient {
    private final AdminClientConfig configs;
    private final Time time;
    private final Map<Node, Connection> connections;

    public AQKafkaAdmin(LogContext logContext, AdminClientConfig adminClientConfig, Time time) {
        super(logContext.logger(AQKafkaAdmin.class), adminClientConfig);
        this.configs = adminClientConfig;
        this.time = time;
        this.connections = new HashMap();
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public ClientResponse send(ClientRequest clientRequest) {
        return parseRequest(clientRequest, clientRequest.apiKey());
    }

    private ClientResponse parseRequest(ClientRequest clientRequest, ApiKeys apiKeys) {
        if (apiKeys == ApiKeys.CREATE_TOPICS) {
            return createTopics(clientRequest);
        }
        if (apiKeys == ApiKeys.DELETE_TOPICS) {
            return deleteTopics(clientRequest);
        }
        if (apiKeys == ApiKeys.METADATA) {
            return getMetadata(clientRequest);
        }
        return null;
    }

    private ClientResponse createTopics(ClientRequest clientRequest) {
        Node destination = clientRequest.destination();
        Map<String, CreateTopicsRequest.TopicDetails> map = ((CreateTopicsRequest.Builder) clientRequest.requestBuilder()).build().topics();
        Connection connection = this.connections.get(destination);
        Map<String, Exception> hashMap = new HashMap();
        SQLException sQLException = null;
        try {
            hashMap = CreateTopics.createTopics(connection, map);
        } catch (SQLException e) {
            sQLException = e;
            this.log.trace("Unexcepted error occured with connection to node {}, closing the connection", clientRequest.destination());
            this.log.trace("Failed to create topics {}", map.keySet());
            try {
                connection.close();
                this.connections.remove(destination);
                this.log.trace("Connection with node {} is closed", clientRequest.destination());
            } catch (SQLException e2) {
                this.log.trace("Failed to close connection with node {}", clientRequest.destination());
            }
        }
        return createTopicsResponse(hashMap, sQLException, sQLException != null, clientRequest);
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public boolean isChannelReady(Node node) {
        return this.connections.containsKey(node);
    }

    private ClientResponse createTopicsResponse(Map<String, Exception> map, Exception exc, boolean z, ClientRequest clientRequest) {
        CreateTopicsResponse createTopicsResponse = new CreateTopicsResponse(map);
        if (exc != null) {
            createTopicsResponse.setResult(exc);
        }
        return new ClientResponse(clientRequest.makeHeader(), clientRequest.callback(), clientRequest.destination(), clientRequest.createdTimeMs(), this.time.milliseconds(), z, createTopicsResponse);
    }

    private ClientResponse deleteTopics(ClientRequest clientRequest) {
        Node destination = clientRequest.destination();
        Set<String> set = ((DeleteTopicsRequest.Builder) clientRequest.requestBuilder()).build().topics();
        Connection connection = this.connections.get(destination);
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet(set);
        try {
            CallableStatement prepareCall = connection.prepareCall("begin dbms_aqadm.drop_sharded_queue(queue_name=>?, force =>(case ? when 1 then true else false end)); end;");
            for (String str : set) {
                try {
                    prepareCall.setString(1, str);
                    prepareCall.setInt(2, 1);
                    prepareCall.execute();
                } catch (SQLException e) {
                    this.log.trace("Failed to delete topic : {}", str);
                    hashMap.put(str, e);
                }
                if (hashMap.get(str) == null) {
                    hashSet.remove(str);
                    this.log.trace("Deleted a topic : {}", str);
                    hashMap.put(str, null);
                }
            }
            try {
                prepareCall.close();
            } catch (SQLException e2) {
            }
            return deleteTopicsResponse(hashMap, null, false, clientRequest);
        } catch (SQLException e3) {
            this.log.trace("Unexcepted error occured with connection to node {}, closing the connection", destination);
            this.log.trace("Failed to delete topics : {}", hashSet);
            try {
                this.connections.remove(destination);
                connection.close();
                this.log.trace("Connection with node {} is closed", clientRequest.destination());
            } catch (SQLException e4) {
                this.log.trace("Failed to close connection with node {}", destination);
            }
            return deleteTopicsResponse(hashMap, e3, true, clientRequest);
        }
    }

    private ClientResponse deleteTopicsResponse(Map<String, SQLException> map, Exception exc, boolean z, ClientRequest clientRequest) {
        DeleteTopicsResponse deleteTopicsResponse = new DeleteTopicsResponse(map);
        if (exc != null) {
            deleteTopicsResponse.setResult(exc);
        }
        return new ClientResponse(clientRequest.makeHeader(), clientRequest.callback(), clientRequest.destination(), clientRequest.createdTimeMs(), this.time.milliseconds(), z, deleteTopicsResponse);
    }

    private ClientResponse getMetadata(ClientRequest clientRequest) {
        ClientResponse metadataNow = getMetadataNow(clientRequest, this.connections.get(clientRequest.destination()));
        if (metadataNow.wasDisconnected()) {
            this.connections.remove(clientRequest.destination());
        }
        return metadataNow;
    }

    private Connection getConnection(Node node) {
        try {
            this.connections.put(node, ConnectionUtils.createJDBCConnection(node, this.configs));
            return this.connections.get(node);
        } catch (SQLException e) {
            if (e.getErrorCode() == 1017) {
                throw new InvalidLoginCredentialsException(e);
            }
            throw new ConnectionException(e.getMessage());
        }
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public void connect(Node node) {
        getConnection(node);
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public void close() {
        Iterator<Map.Entry<Node, Connection>> it = this.connections.entrySet().iterator();
        while (it.hasNext()) {
            close(it.next().getKey());
        }
    }

    @Override // org.oracle.okafka.common.network.AQClient
    public void close(Node node) {
        if (this.connections.containsKey(node)) {
            try {
                this.connections.get(node).close();
                this.connections.remove(node);
                this.log.trace("Connection to node {} closed", node);
            } catch (SQLException e) {
            }
        }
    }
}
