/*
 * Decompiled with CFR 0.152.
 */
package de.bytefish.pgbulkinsert.pgsql.processor;

import de.bytefish.pgbulkinsert.IPgBulkInsert;
import de.bytefish.pgbulkinsert.functional.Func1;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.postgresql.PGConnection;

public class BulkProcessor<TEntity>
implements AutoCloseable {
    private final ScheduledThreadPoolExecutor scheduler;
    private final ScheduledFuture<?> scheduledFuture;
    private volatile boolean closed = false;
    private final IPgBulkInsert<TEntity> client;
    private final Func1<PGConnection> connectionFactory;
    private final Duration flushInterval;
    private final int bulkSize;
    private List<TEntity> batchedEntities;

    public BulkProcessor(IPgBulkInsert<TEntity> client, Func1<PGConnection> connectionFactory, int bulkSize) {
        this(client, connectionFactory, bulkSize, null);
    }

    public BulkProcessor(IPgBulkInsert<TEntity> client, Func1<PGConnection> connectionFactory, int bulkSize, Duration flushInterval) {
        this.client = client;
        this.connectionFactory = connectionFactory;
        this.bulkSize = bulkSize;
        this.flushInterval = flushInterval;
        this.batchedEntities = new ArrayList<TEntity>();
        if (flushInterval != null) {
            this.scheduler = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1);
            this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
            this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.toMillis(), flushInterval.toMillis(), TimeUnit.MILLISECONDS);
        } else {
            this.scheduler = null;
            this.scheduledFuture = null;
        }
    }

    public synchronized BulkProcessor<TEntity> add(TEntity entity) {
        this.batchedEntities.add(entity);
        this.executeIfNeccessary();
        return this;
    }

    @Override
    public void close() throws Exception {
        if (this.closed) {
            return;
        }
        this.closed = true;
        if (this.scheduledFuture != null) {
            BulkProcessor.cancel(this.scheduledFuture);
            this.scheduler.shutdown();
        }
        if (this.batchedEntities.size() > 0) {
            this.execute();
        }
    }

    private void executeIfNeccessary() {
        if (this.batchedEntities.size() >= this.bulkSize) {
            this.execute();
        }
    }

    private void execute() {
        List<TEntity> entities = this.batchedEntities;
        this.batchedEntities = new ArrayList<TEntity>();
        this.write(entities);
    }

    private void write(List<TEntity> entities) {
        try {
            PGConnection connection = this.connectionFactory.invoke();
            this.client.saveAll(connection, entities.stream());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static boolean cancel(Future<?> future) {
        if (future != null) {
            return future.cancel(false);
        }
        return false;
    }

    class Flush
    implements Runnable {
        Flush() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            BulkProcessor bulkProcessor = BulkProcessor.this;
            synchronized (bulkProcessor) {
                if (BulkProcessor.this.closed) {
                    return;
                }
                if (BulkProcessor.this.batchedEntities.size() == 0) {
                    return;
                }
                BulkProcessor.this.execute();
            }
        }
    }
}

