package org.apache.tika.pipes.reporters.jdbc;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.PipesReporterBase;
import org.apache.tika.pipes.PipesResult;
import org.apache.tika.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.class */
public class JDBCPipesReporter extends PipesReporterBase implements Initializable {
    private static final Logger LOG = LoggerFactory.getLogger(JDBCPipesReporter.class);
    private static final int DEFAULT_CACHE_SIZE = 100;
    private static final long DEFAULT_REPORT_WITHIN_MS = 10000;
    private static final int ARRAY_BLOCKING_QUEUE_SIZE = 1000;
    public static final String TABLE_NAME = "tika_status";
    private static final long MAX_WAIT_MILLIS = 120000;
    private String connectionString;
    CompletableFuture<Void> reportWorkerFuture;
    private long reportWithinMs = DEFAULT_REPORT_WITHIN_MS;
    private int cacheSize = DEFAULT_CACHE_SIZE;
    private Optional<String> postConnectionString = Optional.empty();
    private final ArrayBlockingQueue<KeyStatusPair> queue = new ArrayBlockingQueue<>(ARRAY_BLOCKING_QUEUE_SIZE);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter$KeyStatusPair.class */
    public static class KeyStatusPair {
        static KeyStatusPair END_SEMAPHORE = new KeyStatusPair(null, null);
        private final String emitKey;
        private final PipesResult.STATUS status;

        public KeyStatusPair(String str, PipesResult.STATUS status) {
            this.emitKey = str;
            this.status = status;
        }

        public String toString() {
            return "KeyStatusPair{emitKey='" + this.emitKey + "', status=" + this.status + '}';
        }
    }

    /* loaded from: input_file:org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter$ReportWorker.class */
    private static class ReportWorker implements Runnable {
        private static final int MAX_TRIES = 3;
        private final String connectionString;
        private final Optional<String> postConnectionString;
        private final ArrayBlockingQueue<KeyStatusPair> queue;
        private final int cacheSize;
        private final long reportWithinMs;
        List<KeyStatusPair> cache = new ArrayList();
        private Connection connection;
        private PreparedStatement insert;

        public ReportWorker(String str, Optional<String> optional, ArrayBlockingQueue<KeyStatusPair> arrayBlockingQueue, int i, long j) {
            this.connectionString = str;
            this.postConnectionString = optional;
            this.queue = arrayBlockingQueue;
            this.cacheSize = i;
            this.reportWithinMs = j;
        }

        public void init() throws TikaConfigException {
            try {
                createConnection();
                createTable();
                createPreparedStatement();
            } catch (SQLException e) {
                throw new TikaConfigException("Problem creating connection, etc", e);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            while (true) {
                try {
                    KeyStatusPair take = this.queue.take();
                    if (take == KeyStatusPair.END_SEMAPHORE) {
                        shutdownNow();
                        return;
                    }
                    this.cache.add(take);
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    if (this.cache.size() >= this.cacheSize || currentTimeMillis2 > this.reportWithinMs) {
                        try {
                            reportNow();
                            currentTimeMillis = System.currentTimeMillis();
                        } catch (InterruptedException e) {
                            return;
                        } catch (SQLException e2) {
                            throw new RuntimeException(e2);
                        }
                    }
                } catch (InterruptedException e3) {
                    return;
                }
            }
        }

        private void shutdownNow() {
            JDBCPipesReporter.LOG.trace("received end semaphore");
            try {
                reportNow();
                JDBCPipesReporter.LOG.trace("about to close");
                try {
                    this.insert.close();
                } catch (SQLException e) {
                    JDBCPipesReporter.LOG.warn("problem shutting down insert statement in reporter", e);
                }
                try {
                    this.connection.close();
                } catch (SQLException e2) {
                    JDBCPipesReporter.LOG.warn("problem shutting down connection in reporter", e2);
                }
                JDBCPipesReporter.LOG.trace("successfully closed resources");
            } catch (InterruptedException e3) {
            } catch (SQLException e4) {
                throw new RuntimeException(e4);
            }
        }

        private void reportNow() throws SQLException, InterruptedException {
            int i = 0;
            while (true) {
                i++;
                if (i >= MAX_TRIES) {
                    return;
                }
                try {
                    for (KeyStatusPair keyStatusPair : this.cache) {
                        this.insert.clearParameters();
                        this.insert.setString(1, keyStatusPair.emitKey);
                        this.insert.setString(2, keyStatusPair.status.name());
                        this.insert.addBatch();
                    }
                    this.insert.executeBatch();
                    JDBCPipesReporter.LOG.debug("writing {} " + this.cache.size());
                    this.cache.clear();
                    return;
                } catch (SQLException e) {
                    JDBCPipesReporter.LOG.warn("problem writing to the db. Will try to reconnect", e);
                    reconnect();
                }
            }
        }

        private void createTable() throws SQLException {
            Statement createStatement = this.connection.createStatement();
            try {
                createStatement.execute("drop table if exists tika_status");
                createStatement.execute("create table tika_status (path varchar(1024), status varchar(32))");
                if (createStatement != null) {
                    createStatement.close();
                }
            } catch (Throwable th) {
                if (createStatement != null) {
                    try {
                        createStatement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        private void reconnect() throws SQLException, InterruptedException {
            int i = 0;
            SQLException sQLException = null;
            while (true) {
                SQLException sQLException2 = sQLException;
                i++;
                if (i >= MAX_TRIES) {
                    throw sQLException2;
                }
                try {
                    tryClose();
                    createConnection();
                    createPreparedStatement();
                    JDBCPipesReporter.LOG.debug("success reconnecting after {} attempts", Integer.valueOf(i));
                    return;
                } catch (SQLException e) {
                    JDBCPipesReporter.LOG.warn("problem reconnecting", e);
                    Thread.sleep(30000L);
                    sQLException = e;
                }
            }
        }

        private void tryClose() {
            if (this.insert != null) {
                try {
                    this.insert.close();
                } catch (SQLException e) {
                    JDBCPipesReporter.LOG.warn("exception closing insert statement", this.insert);
                }
            }
            if (this.connection != null) {
                try {
                    this.connection.close();
                } catch (SQLException e2) {
                    JDBCPipesReporter.LOG.warn("exception closing connection", e2);
                }
            }
        }

        private void createConnection() throws SQLException {
            this.connection = DriverManager.getConnection(this.connectionString);
            if (this.postConnectionString.isPresent()) {
                Statement createStatement = this.connection.createStatement();
                try {
                    createStatement.execute(this.postConnectionString.get());
                    if (createStatement != null) {
                        createStatement.close();
                    }
                } catch (Throwable th) {
                    if (createStatement != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
        }

        private void createPreparedStatement() throws SQLException {
            this.insert = this.connection.prepareStatement("insert into tika_status (path, status) values (?,?)");
        }
    }

    public void initialize(Map<String, Param> map) throws TikaConfigException {
        super.initialize(map);
        if (StringUtils.isBlank(this.connectionString)) {
            throw new TikaConfigException("Must specify a connectionString");
        }
        ReportWorker reportWorker = new ReportWorker(this.connectionString, this.postConnectionString, this.queue, this.cacheSize, this.reportWithinMs);
        reportWorker.init();
        this.reportWorkerFuture = CompletableFuture.runAsync(reportWorker);
    }

    public void checkInitialization(InitializableProblemHandler initializableProblemHandler) throws TikaConfigException {
    }

    @Field
    public void setConnection(String str) {
        this.connectionString = str;
    }

    @Field
    public void setCacheSize(int i) {
        this.cacheSize = i;
    }

    @Field
    public void setReportWithinMs(long j) {
        this.reportWithinMs = j;
    }

    @Field
    public void setPostConnection(String str) {
        this.postConnectionString = Optional.of(str);
    }

    public void report(FetchEmitTuple fetchEmitTuple, PipesResult pipesResult, long j) {
        if (accept(pipesResult.getStatus())) {
            try {
                this.queue.offer(new KeyStatusPair(fetchEmitTuple.getEmitKey().getEmitKey(), pipesResult.getStatus()), MAX_WAIT_MILLIS, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }
    }

    public void error(Throwable th) {
        LOG.error("reported error; all bets are off", th);
    }

    public void error(String str) {
        LOG.error("reported error; all bets are off: {}", str);
    }

    public void close() throws IOException {
        try {
            try {
                this.queue.offer(KeyStatusPair.END_SEMAPHORE, 60L, TimeUnit.SECONDS);
                try {
                    try {
                        this.reportWorkerFuture.get(60L, TimeUnit.SECONDS);
                        this.reportWorkerFuture.cancel(true);
                    } catch (ExecutionException e) {
                        LOG.error("problem closing", e);
                        throw new RuntimeException(e);
                    }
                } catch (InterruptedException e2) {
                    this.reportWorkerFuture.cancel(true);
                } catch (TimeoutException e3) {
                    LOG.error("timeout closing", e3);
                    this.reportWorkerFuture.cancel(true);
                }
            } catch (InterruptedException e4) {
            }
        } catch (Throwable th) {
            this.reportWorkerFuture.cancel(true);
            throw th;
        }
    }
}
