package org.apache.hudi.async;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
import org.apache.hudi.client.Compactor;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;

/* loaded from: input_file:org/apache/hudi/async/AsyncCompactService.class */
public class AsyncCompactService extends AbstractAsyncService {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class);
    public static final String COMPACT_POOL_NAME = "hoodiecompact";
    private final int maxConcurrentCompaction;
    private transient Compactor compactor;
    private transient JavaSparkContext jssc;
    private transient BlockingQueue<HoodieInstant> pendingCompactions;
    private transient ReentrantLock queueLock;
    private transient Condition consumed;

    public AsyncCompactService(JavaSparkContext javaSparkContext, HoodieWriteClient hoodieWriteClient) {
        this(javaSparkContext, hoodieWriteClient, false);
    }

    public AsyncCompactService(JavaSparkContext javaSparkContext, HoodieWriteClient hoodieWriteClient, boolean z) {
        super(z);
        this.pendingCompactions = new LinkedBlockingQueue();
        this.queueLock = new ReentrantLock();
        this.consumed = this.queueLock.newCondition();
        this.jssc = javaSparkContext;
        this.compactor = new Compactor(hoodieWriteClient, javaSparkContext);
        this.maxConcurrentCompaction = 1;
    }

    public void enqueuePendingCompaction(HoodieInstant hoodieInstant) {
        this.pendingCompactions.add(hoodieInstant);
    }

    public void waitTillPendingCompactionsReducesTo(int i) throws InterruptedException {
        try {
            this.queueLock.lock();
            while (!isShutdown() && this.pendingCompactions.size() > i) {
                this.consumed.await();
            }
        } finally {
            this.queueLock.unlock();
        }
    }

    private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
        LOG.info("Compactor waiting for next instant for compaction upto 60 seconds");
        HoodieInstant poll = this.pendingCompactions.poll(10L, TimeUnit.SECONDS);
        if (poll != null) {
            try {
                this.queueLock.lock();
                this.consumed.signal();
            } finally {
                this.queueLock.unlock();
            }
        }
        return poll;
    }

    @Override // org.apache.hudi.async.AbstractAsyncService
    protected Pair<CompletableFuture, ExecutorService> startService() {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.maxConcurrentCompaction, runnable -> {
            Thread thread = new Thread(runnable, "async_compact_thread");
            thread.setDaemon(isRunInDaemonMode());
            return thread;
        });
        return Pair.of(CompletableFuture.allOf((CompletableFuture[]) IntStream.range(0, this.maxConcurrentCompaction).mapToObj(i -> {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    LOG.info("Setting Spark Pool name for compaction to hoodiecompact");
                    this.jssc.setLocalProperty("spark.scheduler.pool", COMPACT_POOL_NAME);
                    while (!isShutdownRequested()) {
                        HoodieInstant fetchNextCompactionInstant = fetchNextCompactionInstant();
                        if (null != fetchNextCompactionInstant) {
                            LOG.info("Starting Compaction for instant " + fetchNextCompactionInstant);
                            this.compactor.compact(fetchNextCompactionInstant);
                            LOG.info("Finished Compaction for instant " + fetchNextCompactionInstant);
                        }
                    }
                    LOG.info("Compactor shutting down properly!!");
                } catch (IOException e) {
                    LOG.error("Compactor executor failed", e);
                    throw new HoodieIOException(e.getMessage(), e);
                } catch (InterruptedException e2) {
                    LOG.warn("Compactor executor thread got interrupted exception. Stopping", e2);
                }
                return true;
            }, newFixedThreadPool);
        }).toArray(i2 -> {
            return new CompletableFuture[i2];
        })), newFixedThreadPool);
    }

    protected boolean shouldStopCompactor() {
        return false;
    }
}
