package org.apache.flume.channel.jdbc.impl;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import javax.sql.DataSource;
import org.apache.flume.channel.jdbc.ConfigurationConstants;
import org.apache.flume.channel.jdbc.JdbcChannelException;
import org.apache.flume.channel.jdbc.impl.PersistableEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/flume-jdbc-channel-1.9.0.jar:org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.class */
public class DerbySchemaHandler implements SchemaHandler {
    private static final String QUREY_SYSCHEMA_FLUME = "SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = 'FLUME'";
    private static final String SCHEMA_FLUME = "FLUME";
    private static final String TABLE_FL_EVENT_NAME = "FL_EVENT";
    private static final String TABLE_FL_EVENT = "FLUME.FL_EVENT";
    private static final String COLUMN_FLE_ID = "FLE_ID";
    private static final String COLUMN_FLE_PAYLOAD = "FLE_PAYLOAD";
    private static final String INDEX_FLE_CHANNEL_NAME = "IDX_FLE_CHANNEL";
    private static final String INDEX_FLE_CHANNEL = "FLUME.IDX_FLE_CHANNEL";
    private static final String TABLE_FL_PLSPILL_NAME = "FL_PLSPILL";
    private static final String TABLE_FL_PLSPILL = "FLUME.FL_PLSPILL";
    private static final String COLUMN_FLP_EVENT = "FLP_EVENT";
    private static final String COLUMN_FLP_SPILL = "FLP_SPILL";
    private static final String INDEX_FLP_EVENT_NAME = "IDX_FLP_EVENT";
    private static final String INDEX_FLP_EVENT = "FLUME.IDX_FLP_EVENT";
    private static final String TABLE_FL_HEADER_NAME = "FL_HEADER";
    private static final String TABLE_FL_HEADER = "FLUME.FL_HEADER";
    private static final String COLUMN_FLH_ID = "FLH_ID";
    private static final String COLUMN_FLH_EVENT = "FLH_EVENT";
    private static final String COLUMN_FLH_NAME = "FLH_NAME";
    private static final String INDEX_FLH_EVENT_NAME = "IDX_FLH_EVENT";
    private static final String INDEX_FLH_EVENT = "FLUME.IDX_FLH_EVENT";
    private static final String TABLE_FL_NMSPILL_NAME = "FL_NMSPILL";
    private static final String TABLE_FL_NMSPILL = "FLUME.FL_NMSPILL";
    private static final String COLUMN_FLN_HEADER = "FLN_HEADER";
    private static final String COLUMN_FLN_SPILL = "FLN_SPILL";
    private static final String INDEX_FLN_HEADER_NAME = "IDX_FLN_HEADER";
    private static final String INDEX_FLN_HEADER = "FLUME.IDX_FLN_HEADER";
    private static final String TABLE_FL_VLSPILL_NAME = "FL_VLSPILL";
    private static final String TABLE_FL_VLSPILL = "FLUME.FL_VLSPILL";
    private static final String COLUMN_FLV_HEADER = "FLV_HEADER";
    private static final String COLUMN_FLV_SPILL = "FLV_SPILL";
    private static final String INDEX_FLV_HEADER_NAME = "IDX_FLV_HEADER";
    private static final String INDEX_FLV_HEADER = "FLUME.IDX_FLV_HEADER";
    public static final String QUERY_CREATE_SCHEMA_FLUME = "CREATE SCHEMA FLUME";
    public static final String QUERY_CREATE_INDEX_FLE_CHANNEL = "CREATE INDEX FLUME.IDX_FLE_CHANNEL ON FLUME.FL_EVENT (FLE_CHANNEL)";
    public static final String QUERY_CREATE_INDEX_FLP_EVENT = "CREATE INDEX FLUME.IDX_FLP_EVENT ON FLUME.FL_PLSPILL (FLP_EVENT)";
    public static final String QUERY_CREATE_INDEX_FLH_EVENT = "CREATE INDEX FLUME.IDX_FLH_EVENT ON FLUME.FL_HEADER (FLH_EVENT)";
    public static final String QUERY_CREATE_INDEX_FLN_HEADER = "CREATE INDEX FLUME.IDX_FLN_HEADER ON FLUME.FL_NMSPILL (FLN_HEADER)";
    public static final String QUERY_CREATE_INDEX_FLV_HEADER = "CREATE INDEX FLUME.IDX_FLV_HEADER ON FLUME.FL_VLSPILL (FLV_HEADER)";
    public static final String COLUMN_LOOKUP_QUERY = "SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = (SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND SCHEMAID = (SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = ? ))";
    public static final String QUERY_CHANNEL_SIZE = "SELECT COUNT(*) FROM FLUME.FL_EVENT";
    public static final String STMT_INSERT_EVENT_BASE = "INSERT INTO FLUME.FL_EVENT (FLE_PAYLOAD, FLE_CHANNEL, FLE_SPILL) VALUES ( ?, ?, ?)";
    public static final String STMT_INSERT_EVENT_SPILL = "INSERT INTO FLUME.FL_PLSPILL (FLP_EVENT, FLP_SPILL) VALUES ( ?, ?)";
    public static final String STMT_INSERT_HEADER_BASE = "INSERT INTO FLUME.FL_HEADER (FLH_EVENT, FLH_NAME, FLH_VALUE, FLH_NMSPILL, FLH_VLSPILL) VALUES ( ?, ?, ?, ?, ?)";
    public static final String STMT_INSERT_HEADER_NAME_SPILL = "INSERT INTO FLUME.FL_NMSPILL (FLN_HEADER, FLN_SPILL) VALUES ( ?, ?)";
    public static final String STMT_INSERT_HEADER_VALUE_SPILL = "INSERT INTO FLUME.FL_VLSPILL (FLV_HEADER, FLV_SPILL) VALUES ( ?, ?)";
    public static final String STMT_FETCH_PAYLOAD_BASE = "SELECT FLE_ID, FLE_PAYLOAD, FLE_SPILL FROM FLUME.FL_EVENT WHERE FLE_ID = (SELECT MIN(FLE_ID) FROM FLUME.FL_EVENT WHERE FLE_CHANNEL = ?)";
    public static final String STMT_FETCH_PAYLOAD_SPILL = "SELECT FLP_SPILL FROM FLUME.FL_PLSPILL WHERE FLP_EVENT = ?";
    public static final String STMT_FETCH_HEADER_BASE = "SELECT FLH_ID, FLH_NAME, FLH_VALUE, FLH_NMSPILL, FLH_VLSPILL FROM FLUME.FL_HEADER WHERE FLH_EVENT = ?";
    public static final String STMT_FETCH_HEADER_NAME_SPILL = "SELECT FLN_SPILL FROM FLUME.FL_NMSPILL WHERE FLN_HEADER = ?";
    public static final String STMT_FETCH_HEADER_VALUE_SPILL = "SELECT FLV_SPILL FROM FLUME.FL_VLSPILL WHERE FLV_HEADER = ?";
    public static final String STMT_DELETE_HEADER_VALUE_SPILL = "DELETE FROM FLUME.FL_VLSPILL WHERE FLV_HEADER = ?";
    public static final String STMT_DELETE_HEADER_NAME_SPILL = "DELETE FROM FLUME.FL_NMSPILL WHERE FLN_HEADER = ?";
    public static final String STMT_DELETE_EVENT_SPILL = "DELETE FROM FLUME.FL_PLSPILL WHERE FLP_EVENT = ?";
    public static final String STMT_DELETE_HEADER_BASE = "DELETE FROM FLUME.FL_HEADER WHERE FLH_EVENT = ?";
    public static final String STMT_DELETE_EVENT_BASE = "DELETE FROM FLUME.FL_EVENT WHERE FLE_ID = ?";
    private final DataSource dataSource;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) DerbySchemaHandler.class);
    private static final String COLUMN_FLE_CHANNEL = "FLE_CHANNEL";
    private static final String COLUMN_FLE_SPILL = "FLE_SPILL";
    public static final String QUERY_CREATE_TABLE_FL_EVENT = "CREATE TABLE FLUME.FL_EVENT ( FLE_ID BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, FLE_PAYLOAD VARCHAR(" + ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD + ") FOR BIT DATA, " + COLUMN_FLE_CHANNEL + " VARCHAR(" + ConfigurationConstants.CHANNEL_NAME_MAX_LENGTH + "), " + COLUMN_FLE_SPILL + " BOOLEAN)";
    public static final String QUERY_CREATE_TABLE_FL_PLSPILL_FMT = "CREATE TABLE FLUME.FL_PLSPILL ( FLP_EVENT BIGINT, FLP_SPILL BLOB{0})";
    public static final String QUERY_CREATE_TABLE_FL_PLSPILL_FK = MessageFormat.format(QUERY_CREATE_TABLE_FL_PLSPILL_FMT, ", FOREIGN KEY (FLP_EVENT) REFERENCES FLUME.FL_EVENT (FLE_ID)");
    public static final String QUERY_CREATE_TABLE_FL_PLSPILL_NOFK = MessageFormat.format(QUERY_CREATE_TABLE_FL_PLSPILL_FMT, "");
    private static final String COLUMN_FLH_VALUE = "FLH_VALUE";
    private static final String COLUMN_FLH_NMSPILL = "FLH_NMSPILL";
    private static final String COLUMN_FLH_VLSPILL = "FLH_VLSPILL";
    public static final String QUERY_CREATE_TABLE_FL_HEADER_FMT = "CREATE TABLE FLUME.FL_HEADER ( FLH_ID BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, FLH_EVENT BIGINT, FLH_NAME VARCHAR(" + ConfigurationConstants.HEADER_NAME_LENGTH_THRESHOLD + "), " + COLUMN_FLH_VALUE + " VARCHAR(" + ConfigurationConstants.HEADER_VALUE_LENGTH_THRESHOLD + "), " + COLUMN_FLH_NMSPILL + " BOOLEAN, " + COLUMN_FLH_VLSPILL + " BOOLEAN{0})";
    public static final String QUERY_CREATE_TABLE_FL_HEADER_FK = MessageFormat.format(QUERY_CREATE_TABLE_FL_HEADER_FMT, ", FOREIGN KEY (FLH_EVENT) REFERENCES FLUME.FL_EVENT (FLE_ID)");
    public static final String QUERY_CREATE_TABLE_FL_HEADER_NOFK = MessageFormat.format(QUERY_CREATE_TABLE_FL_HEADER_FMT, "");
    public static final String QUERY_CREATE_TABLE_FL_NMSPILL_FMT = "CREATE TABLE FLUME.FL_NMSPILL ( FLN_HEADER BIGINT, FLN_SPILL VARCHAR(" + ConfigurationConstants.HEADER_NAME_SPILL_MAX_LENGTH + "){0})";
    public static final String QUERY_CREATE_TABLE_FL_NMSPILL_FK = MessageFormat.format(QUERY_CREATE_TABLE_FL_NMSPILL_FMT, ", FOREIGN KEY (FLN_HEADER) REFERENCES FLUME.FL_HEADER (FLH_ID)");
    public static final String QUERY_CREATE_TABLE_FL_NMSPILL_NOFK = MessageFormat.format(QUERY_CREATE_TABLE_FL_NMSPILL_FMT, "");
    public static final String QUERY_CREATE_TABLE_FL_VLSPILL_FMT = "CREATE TABLE FLUME.FL_VLSPILL ( FLV_HEADER BIGINT, FLV_SPILL VARCHAR(" + ConfigurationConstants.HEADER_VALUE_SPILL_MAX_LENGTH + "){0})";
    public static final String QUERY_CREATE_TABLE_FL_VLSPILL_FK = MessageFormat.format(QUERY_CREATE_TABLE_FL_VLSPILL_FMT, ", FOREIGN KEY (FLV_HEADER) REFERENCES FLUME.FL_HEADER (FLH_ID)");
    public static final String QUERY_CREATE_TABLE_FL_VLSPILL_NOFK = MessageFormat.format(QUERY_CREATE_TABLE_FL_VLSPILL_FMT, "");

    /* JADX INFO: Access modifiers changed from: protected */
    public DerbySchemaHandler(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override // org.apache.flume.channel.jdbc.impl.SchemaHandler
    public boolean schemaExists() {
        Connection connection = null;
        Statement statement = null;
        try {
            try {
                Connection connection2 = this.dataSource.getConnection();
                Statement createStatement = connection2.createStatement();
                ResultSet executeQuery = createStatement.executeQuery(QUREY_SYSCHEMA_FLUME);
                if (!executeQuery.next()) {
                    LOGGER.warn("Schema for FLUME does not exist");
                    if (createStatement != null) {
                        try {
                            createStatement.close();
                        } catch (SQLException e) {
                            LOGGER.error("Unable to close schema lookup stmt", (Throwable) e);
                        }
                    }
                    if (connection2 != null) {
                        try {
                            connection2.close();
                        } catch (SQLException e2) {
                            LOGGER.error("Unable to close connection", (Throwable) e2);
                        }
                    }
                    return false;
                }
                LOGGER.debug("Flume schema ID: " + executeQuery.getString(1));
                connection2.commit();
                if (createStatement != null) {
                    try {
                        createStatement.close();
                    } catch (SQLException e3) {
                        LOGGER.error("Unable to close schema lookup stmt", (Throwable) e3);
                    }
                }
                if (connection2 == null) {
                    return true;
                }
                try {
                    connection2.close();
                    return true;
                } catch (SQLException e4) {
                    LOGGER.error("Unable to close connection", (Throwable) e4);
                    return true;
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    try {
                        statement.close();
                    } catch (SQLException e5) {
                        LOGGER.error("Unable to close schema lookup stmt", (Throwable) e5);
                    }
                }
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (SQLException e6) {
                        LOGGER.error("Unable to close connection", (Throwable) e6);
                    }
                }
                throw th;
            }
        } catch (SQLException e7) {
            try {
                connection.rollback();
            } catch (SQLException e8) {
                LOGGER.error("Unable to rollback transaction", (Throwable) e8);
            }
            throw new JdbcChannelException("Unable to query schema", e7);
        }
    }

    @Override // org.apache.flume.channel.jdbc.impl.SchemaHandler
    public void createSchemaObjects(boolean z, boolean z2) {
        runQuery(QUERY_CREATE_SCHEMA_FLUME);
        runQuery(QUERY_CREATE_TABLE_FL_EVENT);
        if (z) {
            runQuery(QUERY_CREATE_TABLE_FL_PLSPILL_FK);
            runQuery(QUERY_CREATE_TABLE_FL_HEADER_FK);
            runQuery(QUERY_CREATE_TABLE_FL_NMSPILL_FK);
            runQuery(QUERY_CREATE_TABLE_FL_VLSPILL_FK);
        } else {
            runQuery(QUERY_CREATE_TABLE_FL_PLSPILL_NOFK);
            runQuery(QUERY_CREATE_TABLE_FL_HEADER_NOFK);
            runQuery(QUERY_CREATE_TABLE_FL_NMSPILL_NOFK);
            runQuery(QUERY_CREATE_TABLE_FL_VLSPILL_NOFK);
        }
        if (z2) {
            runQuery(QUERY_CREATE_INDEX_FLE_CHANNEL);
            runQuery(QUERY_CREATE_INDEX_FLH_EVENT);
            runQuery(QUERY_CREATE_INDEX_FLP_EVENT);
            runQuery(QUERY_CREATE_INDEX_FLN_HEADER);
            runQuery(QUERY_CREATE_INDEX_FLV_HEADER);
        }
    }

    @Override // org.apache.flume.channel.jdbc.impl.SchemaHandler
    public void validateSchema() {
        verifyTableStructure(SCHEMA_FLUME, TABLE_FL_EVENT_NAME, COLUMN_FLE_ID, COLUMN_FLE_PAYLOAD, COLUMN_FLE_CHANNEL, COLUMN_FLE_SPILL);
        verifyTableStructure(SCHEMA_FLUME, TABLE_FL_PLSPILL_NAME, COLUMN_FLP_EVENT, COLUMN_FLP_SPILL);
        verifyTableStructure(SCHEMA_FLUME, TABLE_FL_HEADER_NAME, COLUMN_FLH_ID, COLUMN_FLH_EVENT, COLUMN_FLH_NAME, COLUMN_FLH_VALUE, COLUMN_FLH_NMSPILL, COLUMN_FLH_VLSPILL);
        verifyTableStructure(SCHEMA_FLUME, TABLE_FL_NMSPILL_NAME, COLUMN_FLN_HEADER, COLUMN_FLN_SPILL);
        verifyTableStructure(SCHEMA_FLUME, TABLE_FL_VLSPILL_NAME, COLUMN_FLV_HEADER, COLUMN_FLV_SPILL);
    }

    private void verifyTableStructure(String str, String str2, String... strArr) {
        HashSet hashSet = new HashSet();
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            try {
                connection = this.dataSource.getConnection();
                preparedStatement = connection.prepareStatement(COLUMN_LOOKUP_QUERY);
                preparedStatement.setString(1, str2);
                preparedStatement.setString(2, str);
                ResultSet executeQuery = preparedStatement.executeQuery();
                while (executeQuery.next()) {
                    hashSet.add(executeQuery.getString(1));
                }
                connection.commit();
                if (preparedStatement != null) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e) {
                        LOGGER.error("Unable to close statement", (Throwable) e);
                    }
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (SQLException e2) {
                            LOGGER.error("Unable to close connection", (Throwable) e2);
                        }
                    }
                }
                HashSet hashSet2 = new HashSet();
                hashSet2.addAll(hashSet);
                StringBuilder sb = new StringBuilder("{");
                boolean z = true;
                for (String str3 : strArr) {
                    hashSet2.remove(str3);
                    if (z) {
                        z = false;
                    } else {
                        sb.append(", ");
                    }
                    sb.append(str3);
                }
                sb.append("}");
                String sb2 = sb.toString();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Table " + str + "." + str2 + " expected columns: " + sb2 + ", actual columns: " + hashSet);
                }
                if (hashSet.size() != strArr.length || hashSet2.size() != 0) {
                    throw new JdbcChannelException("Expected table " + str + "." + str2 + " to have columns: " + sb2 + ". Instead found columns: " + hashSet);
                }
            } catch (Throwable th) {
                if (preparedStatement != null) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e3) {
                        LOGGER.error("Unable to close statement", (Throwable) e3);
                    }
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (SQLException e4) {
                            LOGGER.error("Unable to close connection", (Throwable) e4);
                        }
                    }
                }
                throw th;
            }
        } catch (SQLException e5) {
            try {
                connection.rollback();
            } catch (SQLException e6) {
                LOGGER.error("Unable to rollback transaction", (Throwable) e6);
            }
            throw new JdbcChannelException("Unable to run query: SELECT COLUMNNAME from SYS.SYSCOLUMNS where REFERENCEID = (SELECT TABLEID FROM SYS.SYSTABLES WHERE TABLENAME = ? AND SCHEMAID = (SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = ? )): 1=" + str2 + ", 2=" + str, e5);
        }
    }

    private void runQuery(String str) {
        Connection connection = null;
        Statement statement = null;
        try {
            try {
                connection = this.dataSource.getConnection();
                statement = connection.createStatement();
                if (statement.execute(str)) {
                    int i = 0;
                    while (statement.getResultSet().next()) {
                        i++;
                    }
                    LOGGER.info("QUERY(" + str + ") produced unused resultset with " + i + " rows");
                } else {
                    LOGGER.info("QUERY(" + str + ") Update count: " + statement.getUpdateCount());
                }
                connection.commit();
                if (statement != null) {
                    try {
                        statement.close();
                    } catch (SQLException e) {
                        LOGGER.error("Unable to close statement", (Throwable) e);
                    }
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (SQLException e2) {
                            LOGGER.error("Unable to close connection", (Throwable) e2);
                        }
                    }
                }
            } catch (SQLException e3) {
                try {
                    connection.rollback();
                } catch (SQLException e4) {
                    LOGGER.error("Unable to rollback transaction", (Throwable) e4);
                }
                throw new JdbcChannelException("Unable to run query: " + str, e3);
            }
        } catch (Throwable th) {
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e5) {
                    LOGGER.error("Unable to close statement", (Throwable) e5);
                }
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (SQLException e6) {
                        LOGGER.error("Unable to close connection", (Throwable) e6);
                    }
                }
            }
            throw th;
        }
    }

    @Override // org.apache.flume.channel.jdbc.impl.SchemaHandler
    public void storeEvent(PersistableEvent persistableEvent, Connection connection) {
        byte[] basePayload = persistableEvent.getBasePayload();
        byte[] spillPayload = persistableEvent.getSpillPayload();
        boolean z = spillPayload != null;
        String channelName = persistableEvent.getChannelName();
        LOGGER.debug("Preparing insert event: " + persistableEvent);
        PreparedStatement preparedStatement = null;
        PreparedStatement preparedStatement2 = null;
        PreparedStatement preparedStatement3 = null;
        PreparedStatement preparedStatement4 = null;
        PreparedStatement preparedStatement5 = null;
        try {
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(STMT_INSERT_EVENT_BASE, 1);
                prepareStatement.setBytes(1, basePayload);
                prepareStatement.setString(2, channelName);
                prepareStatement.setBoolean(3, z);
                int executeUpdate = prepareStatement.executeUpdate();
                if (executeUpdate != 1) {
                    throw new JdbcChannelException("Invalid update count on base event insert: " + executeUpdate);
                }
                ResultSet generatedKeys = prepareStatement.getGeneratedKeys();
                if (!generatedKeys.next()) {
                    throw new JdbcChannelException("Unable to retrieive inserted event-id");
                }
                long j = generatedKeys.getLong(1);
                persistableEvent.setEventId(j);
                if (z) {
                    preparedStatement2 = connection.prepareStatement(STMT_INSERT_EVENT_SPILL);
                    preparedStatement2.setLong(1, j);
                    preparedStatement2.setBinaryStream(2, (InputStream) new ByteArrayInputStream(spillPayload), spillPayload.length);
                    int executeUpdate2 = preparedStatement2.executeUpdate();
                    if (executeUpdate2 != 1) {
                        throw new JdbcChannelException("Invalid update count on spill event insert: " + executeUpdate2);
                    }
                }
                List<PersistableEvent.HeaderEntry> headerEntries = persistableEvent.getHeaderEntries();
                if (headerEntries != null && headerEntries.size() > 0) {
                    ArrayList<PersistableEvent.HeaderEntry> arrayList = new ArrayList();
                    ArrayList<PersistableEvent.HeaderEntry> arrayList2 = new ArrayList();
                    preparedStatement3 = connection.prepareStatement(STMT_INSERT_HEADER_BASE, 1);
                    for (PersistableEvent.HeaderEntry headerEntry : headerEntries) {
                        PersistableEvent.SpillableString name = headerEntry.getName();
                        PersistableEvent.SpillableString value = headerEntry.getValue();
                        preparedStatement3.setLong(1, j);
                        preparedStatement3.setString(2, name.getBase());
                        preparedStatement3.setString(3, value.getBase());
                        preparedStatement3.setBoolean(4, name.hasSpill());
                        preparedStatement3.setBoolean(5, value.hasSpill());
                        int executeUpdate3 = preparedStatement3.executeUpdate();
                        if (executeUpdate3 != 1) {
                            throw new JdbcChannelException("Unexpected update header count: " + executeUpdate3);
                        }
                        ResultSet generatedKeys2 = preparedStatement3.getGeneratedKeys();
                        if (!generatedKeys2.next()) {
                            throw new JdbcChannelException("Unable to retrieve inserted header id");
                        }
                        headerEntry.setId(generatedKeys2.getLong(1));
                        if (name.hasSpill()) {
                            arrayList.add(headerEntry);
                        }
                        if (value.hasSpill()) {
                            arrayList2.add(headerEntry);
                        }
                    }
                    if (arrayList.size() > 0) {
                        LOGGER.debug("Number of headers with name spill: " + arrayList.size());
                        preparedStatement4 = connection.prepareStatement(STMT_INSERT_HEADER_NAME_SPILL);
                        for (PersistableEvent.HeaderEntry headerEntry2 : arrayList) {
                            String spill = headerEntry2.getName().getSpill();
                            preparedStatement4.setLong(1, headerEntry2.getId());
                            preparedStatement4.setString(2, spill);
                            preparedStatement4.addBatch();
                        }
                        int[] executeBatch = preparedStatement4.executeBatch();
                        if (executeBatch.length != arrayList.size()) {
                            throw new JdbcChannelException("Unexpected update count for header name spills: expected " + arrayList.size() + ", found " + executeBatch.length);
                        }
                        for (int i = 0; i < executeBatch.length; i++) {
                            if (executeBatch[i] != 1) {
                                throw new JdbcChannelException("Unexpected update count for header name spill at position " + i + ", value: " + executeBatch[i]);
                            }
                        }
                    }
                    if (arrayList2.size() > 0) {
                        LOGGER.debug("Number of headers with value spill: " + arrayList2.size());
                        preparedStatement5 = connection.prepareStatement(STMT_INSERT_HEADER_VALUE_SPILL);
                        for (PersistableEvent.HeaderEntry headerEntry3 : arrayList2) {
                            String spill2 = headerEntry3.getValue().getSpill();
                            preparedStatement5.setLong(1, headerEntry3.getId());
                            preparedStatement5.setString(2, spill2);
                            preparedStatement5.addBatch();
                        }
                        int[] executeBatch2 = preparedStatement5.executeBatch();
                        if (executeBatch2.length != arrayList2.size()) {
                            throw new JdbcChannelException("Unexpected update count for header value spills: expected " + arrayList2.size() + ", found " + executeBatch2.length);
                        }
                        for (int i2 = 0; i2 < executeBatch2.length; i2++) {
                            if (executeBatch2[i2] != 1) {
                                throw new JdbcChannelException("Unexpected update count for header value spill at position " + i2 + ", value: " + executeBatch2[i2]);
                            }
                        }
                    }
                }
                if (prepareStatement != null) {
                    try {
                        prepareStatement.close();
                    } catch (SQLException e) {
                        LOGGER.error("Unable to close base event statement", (Throwable) e);
                    }
                }
                if (preparedStatement2 != null) {
                    try {
                        preparedStatement2.close();
                    } catch (SQLException e2) {
                        LOGGER.error("Unable to close spill event statement", (Throwable) e2);
                    }
                }
                if (preparedStatement3 != null) {
                    try {
                        preparedStatement3.close();
                    } catch (SQLException e3) {
                        LOGGER.error("Unable to close base header statement", (Throwable) e3);
                    }
                }
                if (preparedStatement4 != null) {
                    try {
                        preparedStatement4.close();
                    } catch (SQLException e4) {
                        LOGGER.error("Unable to close header name spill statement", (Throwable) e4);
                    }
                }
                if (preparedStatement5 != null) {
                    try {
                        preparedStatement5.close();
                    } catch (SQLException e5) {
                        LOGGER.error("Unable to close header value spill statement", (Throwable) e5);
                    }
                }
                LOGGER.debug("Event persisted: " + persistableEvent);
            } catch (Throwable th) {
                if (0 != 0) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e6) {
                        LOGGER.error("Unable to close base event statement", (Throwable) e6);
                    }
                }
                if (0 != 0) {
                    try {
                        preparedStatement2.close();
                    } catch (SQLException e7) {
                        LOGGER.error("Unable to close spill event statement", (Throwable) e7);
                    }
                }
                if (0 != 0) {
                    try {
                        preparedStatement3.close();
                    } catch (SQLException e8) {
                        LOGGER.error("Unable to close base header statement", (Throwable) e8);
                    }
                }
                if (0 != 0) {
                    try {
                        preparedStatement4.close();
                    } catch (SQLException e9) {
                        LOGGER.error("Unable to close header name spill statement", (Throwable) e9);
                    }
                }
                if (0 != 0) {
                    try {
                        preparedStatement5.close();
                    } catch (SQLException e10) {
                        LOGGER.error("Unable to close header value spill statement", (Throwable) e10);
                    }
                }
                throw th;
            }
        } catch (SQLException e11) {
            throw new JdbcChannelException("Unable to persist event: " + persistableEvent, e11);
        }
    }

    @Override // org.apache.flume.channel.jdbc.impl.SchemaHandler
    public PersistableEvent fetchAndDeleteEvent(String str, Connection connection) {
        PreparedStatement preparedStatement = null;
        PreparedStatement preparedStatement2 = null;
        InputStream inputStream = null;
        PreparedStatement preparedStatement3 = null;
        PreparedStatement preparedStatement4 = null;
        PreparedStatement preparedStatement5 = null;
        PreparedStatement preparedStatement6 = null;
        PreparedStatement preparedStatement7 = null;
        PreparedStatement preparedStatement8 = null;
        PreparedStatement preparedStatement9 = null;
        PreparedStatement preparedStatement10 = null;
        try {
            try {
                try {
                    PreparedStatement prepareStatement = connection.prepareStatement(STMT_FETCH_PAYLOAD_BASE);
                    prepareStatement.setString(1, str);
                    ResultSet executeQuery = prepareStatement.executeQuery();
                    if (!executeQuery.next()) {
                        LOGGER.debug("No events found for channel: " + str);
                        if (0 != 0) {
                            try {
                                inputStream.close();
                            } catch (IOException e) {
                                LOGGER.error("Unable to close payload spill stream", (Throwable) e);
                            }
                        }
                        if (prepareStatement != null) {
                            try {
                                prepareStatement.close();
                            } catch (SQLException e2) {
                                LOGGER.error("Unable to close base event fetch statement", (Throwable) e2);
                            }
                        }
                        if (0 != 0) {
                            try {
                                preparedStatement2.close();
                            } catch (SQLException e3) {
                                LOGGER.error("Unable to close spill event fetch statment", (Throwable) e3);
                            }
                        }
                        if (0 != 0) {
                            try {
                                preparedStatement6.close();
                            } catch (SQLException e4) {
                                LOGGER.error("Unable to close event spill delete statement", (Throwable) e4);
                            }
                        }
                        if (0 != 0) {
                            try {
                                preparedStatement3.close();
                            } catch (SQLException e5) {
                                LOGGER.error("Unable to close base header fetch statement", (Throwable) e5);
                            }
                        }
                        if (0 != 0) {
                            try {
                                preparedStatement4.close();
                            } catch (SQLException e6) {
                                LOGGER.error("Unable to close name spill fetch statement", (Throwable) e6);
                            }
                        }
                        if (0 != 0) {
                            try {
                                preparedStatement5.close();
                            } catch (SQLException e7) {
                                LOGGER.error("Unable to close value spill fetch statement", (Throwable) e7);
                            }
                        }
                        if (0 != 0) {
                            try {
                                preparedStatement7.close();
                            } catch (SQLException e8) {
                                LOGGER.error("Unable to close value spill delete statement", (Throwable) e8);
                            }
                        }
                        if (0 != 0) {
                            try {
                                preparedStatement8.close();
                            } catch (SQLException e9) {
                                LOGGER.error("Unable to close value spill delete statement", (Throwable) e9);
                            }
                        }
                        if (0 != 0) {
                            try {
                                preparedStatement9.close();
                            } catch (SQLException e10) {
                                LOGGER.error("Unable to close base header delete statement", (Throwable) e10);
                            }
                        }
                        if (0 != 0) {
                            try {
                                preparedStatement10.close();
                            } catch (SQLException e11) {
                                LOGGER.error("Unable to close base event delete statement", (Throwable) e11);
                            }
                        }
                        return null;
                    }
                    long j = executeQuery.getLong(1);
                    PersistableEvent.Builder builder = new PersistableEvent.Builder(str, j);
                    builder.setBasePayload(executeQuery.getBytes(2));
                    if (executeQuery.getBoolean(3)) {
                        preparedStatement2 = connection.prepareStatement(STMT_FETCH_PAYLOAD_SPILL);
                        preparedStatement2.setLong(1, j);
                        ResultSet executeQuery2 = preparedStatement2.executeQuery();
                        if (!executeQuery2.next()) {
                            throw new JdbcChannelException("Payload spill expected but not found for event: " + j);
                        }
                        inputStream = executeQuery2.getBlob(1).getBinaryStream();
                        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                        byte[] bArr = new byte[1024];
                        while (true) {
                            int read = inputStream.read(bArr);
                            if (read == -1) {
                                break;
                            }
                            byteArrayOutputStream.write(bArr, 0, read);
                        }
                        builder.setSpillPayload(byteArrayOutputStream.toByteArray());
                        preparedStatement6 = connection.prepareStatement(STMT_DELETE_EVENT_SPILL);
                        preparedStatement6.setLong(1, j);
                        int executeUpdate = preparedStatement6.executeUpdate();
                        if (executeUpdate != 1) {
                            throw new JdbcChannelException("Unexpected row count for spill delete: " + executeUpdate);
                        }
                    }
                    if (executeQuery.next()) {
                        throw new JdbcChannelException("More than expected events retrieved");
                    }
                    ArrayList arrayList = null;
                    ArrayList arrayList2 = null;
                    PreparedStatement prepareStatement2 = connection.prepareStatement(STMT_FETCH_HEADER_BASE);
                    prepareStatement2.setLong(1, j);
                    int i = 0;
                    ResultSet executeQuery3 = prepareStatement2.executeQuery();
                    while (executeQuery3.next()) {
                        i++;
                        long j2 = executeQuery3.getLong(1);
                        String string = executeQuery3.getString(2);
                        String string2 = executeQuery3.getString(3);
                        boolean z = executeQuery3.getBoolean(4);
                        boolean z2 = executeQuery3.getBoolean(5);
                        builder.setHeader(j2, string, string2);
                        if (z) {
                            if (arrayList == null) {
                                arrayList = new ArrayList();
                            }
                            arrayList.add(Long.valueOf(j2));
                        }
                        if (z2) {
                            if (arrayList2 == null) {
                                arrayList2 = new ArrayList();
                            }
                            arrayList2.add(Long.valueOf(j2));
                        }
                    }
                    if (arrayList != null) {
                        preparedStatement4 = connection.prepareStatement(STMT_FETCH_HEADER_NAME_SPILL);
                        preparedStatement7 = connection.prepareStatement(STMT_DELETE_HEADER_NAME_SPILL);
                        Iterator it = arrayList.iterator();
                        while (it.hasNext()) {
                            long longValue = ((Long) it.next()).longValue();
                            preparedStatement4.setLong(1, longValue);
                            ResultSet executeQuery4 = preparedStatement4.executeQuery();
                            if (!executeQuery4.next()) {
                                throw new JdbcChannelException("Name spill was set for header " + longValue + " but was not found");
                            }
                            builder.setHeaderNameSpill(longValue, executeQuery4.getString(1));
                            preparedStatement7.setLong(1, longValue);
                            preparedStatement7.addBatch();
                        }
                        int[] executeBatch = preparedStatement7.executeBatch();
                        if (executeBatch.length != arrayList.size()) {
                            throw new JdbcChannelException("Unexpected number of header name spill deletes: expected " + arrayList.size() + ", found: " + executeBatch.length);
                        }
                        for (int i2 : executeBatch) {
                            if (i2 != 1) {
                                throw new JdbcChannelException("Unexpected number of deleted rows for header name spill deletes: " + i2);
                            }
                        }
                    }
                    if (arrayList2 != null) {
                        preparedStatement5 = connection.prepareStatement(STMT_FETCH_HEADER_VALUE_SPILL);
                        preparedStatement8 = connection.prepareStatement(STMT_DELETE_HEADER_VALUE_SPILL);
                        Iterator it2 = arrayList2.iterator();
                        while (it2.hasNext()) {
                            long longValue2 = ((Long) it2.next()).longValue();
                            preparedStatement5.setLong(1, longValue2);
                            ResultSet executeQuery5 = preparedStatement5.executeQuery();
                            if (!executeQuery5.next()) {
                                throw new JdbcChannelException("Value spill was set for header " + longValue2 + " but was not found");
                            }
                            builder.setHeaderValueSpill(longValue2, executeQuery5.getString(1));
                            preparedStatement8.setLong(1, longValue2);
                            preparedStatement8.addBatch();
                        }
                        int[] executeBatch2 = preparedStatement8.executeBatch();
                        if (executeBatch2.length != arrayList2.size()) {
                            throw new JdbcChannelException("Unexpected number of header value spill deletes: expected " + arrayList2.size() + ", found: " + executeBatch2.length);
                        }
                        for (int i3 : executeBatch2) {
                            if (i3 != 1) {
                                throw new JdbcChannelException("Unexpected number of deleted rows for header value spill deletes: " + i3);
                            }
                        }
                    }
                    if (i > 0) {
                        preparedStatement9 = connection.prepareStatement(STMT_DELETE_HEADER_BASE);
                        preparedStatement9.setLong(1, j);
                        int executeUpdate2 = preparedStatement9.executeUpdate();
                        if (executeUpdate2 != i) {
                            throw new JdbcChannelException("Unexpected base header delete count: expected: " + i + ", found: " + executeUpdate2);
                        }
                    }
                    PreparedStatement prepareStatement3 = connection.prepareStatement(STMT_DELETE_EVENT_BASE);
                    prepareStatement3.setLong(1, j);
                    int executeUpdate3 = prepareStatement3.executeUpdate();
                    if (executeUpdate3 != 1) {
                        throw new JdbcChannelException("Unexpected row count for delete of event-id: " + j + ", count: " + executeUpdate3);
                    }
                    if (inputStream != null) {
                        try {
                            inputStream.close();
                        } catch (IOException e12) {
                            LOGGER.error("Unable to close payload spill stream", (Throwable) e12);
                        }
                    }
                    if (prepareStatement != null) {
                        try {
                            prepareStatement.close();
                        } catch (SQLException e13) {
                            LOGGER.error("Unable to close base event fetch statement", (Throwable) e13);
                        }
                    }
                    if (preparedStatement2 != null) {
                        try {
                            preparedStatement2.close();
                        } catch (SQLException e14) {
                            LOGGER.error("Unable to close spill event fetch statment", (Throwable) e14);
                        }
                    }
                    if (preparedStatement6 != null) {
                        try {
                            preparedStatement6.close();
                        } catch (SQLException e15) {
                            LOGGER.error("Unable to close event spill delete statement", (Throwable) e15);
                        }
                    }
                    if (prepareStatement2 != null) {
                        try {
                            prepareStatement2.close();
                        } catch (SQLException e16) {
                            LOGGER.error("Unable to close base header fetch statement", (Throwable) e16);
                        }
                    }
                    if (preparedStatement4 != null) {
                        try {
                            preparedStatement4.close();
                        } catch (SQLException e17) {
                            LOGGER.error("Unable to close name spill fetch statement", (Throwable) e17);
                        }
                    }
                    if (preparedStatement5 != null) {
                        try {
                            preparedStatement5.close();
                        } catch (SQLException e18) {
                            LOGGER.error("Unable to close value spill fetch statement", (Throwable) e18);
                        }
                    }
                    if (preparedStatement7 != null) {
                        try {
                            preparedStatement7.close();
                        } catch (SQLException e19) {
                            LOGGER.error("Unable to close value spill delete statement", (Throwable) e19);
                        }
                    }
                    if (preparedStatement8 != null) {
                        try {
                            preparedStatement8.close();
                        } catch (SQLException e20) {
                            LOGGER.error("Unable to close value spill delete statement", (Throwable) e20);
                        }
                    }
                    if (preparedStatement9 != null) {
                        try {
                            preparedStatement9.close();
                        } catch (SQLException e21) {
                            LOGGER.error("Unable to close base header delete statement", (Throwable) e21);
                        }
                    }
                    if (prepareStatement3 != null) {
                        try {
                            prepareStatement3.close();
                        } catch (SQLException e22) {
                            LOGGER.error("Unable to close base event delete statement", (Throwable) e22);
                        }
                    }
                    return builder.build();
                } catch (Throwable th) {
                    if (0 != 0) {
                        try {
                            inputStream.close();
                        } catch (IOException e23) {
                            LOGGER.error("Unable to close payload spill stream", (Throwable) e23);
                        }
                    }
                    if (0 != 0) {
                        try {
                            preparedStatement.close();
                        } catch (SQLException e24) {
                            LOGGER.error("Unable to close base event fetch statement", (Throwable) e24);
                        }
                    }
                    if (0 != 0) {
                        try {
                            preparedStatement2.close();
                        } catch (SQLException e25) {
                            LOGGER.error("Unable to close spill event fetch statment", (Throwable) e25);
                        }
                    }
                    if (0 != 0) {
                        try {
                            preparedStatement6.close();
                        } catch (SQLException e26) {
                            LOGGER.error("Unable to close event spill delete statement", (Throwable) e26);
                        }
                    }
                    if (0 != 0) {
                        try {
                            preparedStatement3.close();
                        } catch (SQLException e27) {
                            LOGGER.error("Unable to close base header fetch statement", (Throwable) e27);
                        }
                    }
                    if (0 != 0) {
                        try {
                            preparedStatement4.close();
                        } catch (SQLException e28) {
                            LOGGER.error("Unable to close name spill fetch statement", (Throwable) e28);
                        }
                    }
                    if (0 != 0) {
                        try {
                            preparedStatement5.close();
                        } catch (SQLException e29) {
                            LOGGER.error("Unable to close value spill fetch statement", (Throwable) e29);
                        }
                    }
                    if (0 != 0) {
                        try {
                            preparedStatement7.close();
                        } catch (SQLException e30) {
                            LOGGER.error("Unable to close value spill delete statement", (Throwable) e30);
                        }
                    }
                    if (0 != 0) {
                        try {
                            preparedStatement8.close();
                        } catch (SQLException e31) {
                            LOGGER.error("Unable to close value spill delete statement", (Throwable) e31);
                        }
                    }
                    if (0 != 0) {
                        try {
                            preparedStatement9.close();
                        } catch (SQLException e32) {
                            LOGGER.error("Unable to close base header delete statement", (Throwable) e32);
                        }
                    }
                    if (0 != 0) {
                        try {
                            preparedStatement10.close();
                        } catch (SQLException e33) {
                            LOGGER.error("Unable to close base event delete statement", (Throwable) e33);
                        }
                    }
                    throw th;
                }
            } catch (SQLException e34) {
                throw new JdbcChannelException("Unable to retrieve event", e34);
            }
        } catch (IOException e35) {
            throw new JdbcChannelException("Unable to read data", e35);
        }
    }

    @Override // org.apache.flume.channel.jdbc.impl.SchemaHandler
    public long getChannelSize(Connection connection) {
        Statement statement = null;
        try {
            try {
                Statement createStatement = connection.createStatement();
                createStatement.execute(QUERY_CHANNEL_SIZE);
                ResultSet resultSet = createStatement.getResultSet();
                if (!resultSet.next()) {
                    throw new JdbcChannelException("Failed to determine channel size: Query (SELECT COUNT(*) FROM FLUME.FL_EVENT) did not produce any results");
                }
                long j = resultSet.getLong(1);
                connection.commit();
                if (createStatement != null) {
                    try {
                        createStatement.close();
                    } catch (SQLException e) {
                        LOGGER.error("Unable to close statement", (Throwable) e);
                    }
                }
                return j;
            } catch (SQLException e2) {
                try {
                    connection.rollback();
                } catch (SQLException e3) {
                    LOGGER.error("Unable to rollback transaction", (Throwable) e3);
                }
                throw new JdbcChannelException("Unable to run query: SELECT COUNT(*) FROM FLUME.FL_EVENT", e2);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    statement.close();
                } catch (SQLException e4) {
                    LOGGER.error("Unable to close statement", (Throwable) e4);
                }
            }
            throw th;
        }
    }
}
