package org.apache.paimon.flink.source.operator;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.MultiTablesCompactorUtil;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.system.BucketsTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/source/operator/MultiTablesCompactorSourceFunction.class */
public abstract class MultiTablesCompactorSourceFunction extends RichSourceFunction<Tuple2<Split, String>> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(MultiTablesCompactorSourceFunction.class);
    protected final Catalog.Loader catalogLoader;
    protected final Pattern includingPattern;
    protected final Pattern excludingPattern;
    protected final Pattern databasePattern;
    protected final boolean isStreaming;
    protected final long monitorInterval;
    protected transient Catalog catalog;
    protected transient Map<Identifier, BucketsTable> tablesMap;
    protected transient Map<Identifier, StreamTableScan> scansMap;
    protected volatile boolean isRunning = true;
    protected transient SourceFunction.SourceContext<Tuple2<Split, String>> ctx;

    public MultiTablesCompactorSourceFunction(Catalog.Loader loader, Pattern pattern, Pattern pattern2, Pattern pattern3, boolean z, long j) {
        this.catalogLoader = loader;
        this.includingPattern = pattern;
        this.excludingPattern = pattern2;
        this.databasePattern = pattern3;
        this.isStreaming = z;
        this.monitorInterval = j;
    }

    public void open(Configuration configuration) throws Exception {
        this.tablesMap = new HashMap();
        this.scansMap = new HashMap();
        this.catalog = this.catalogLoader.load();
        updateTableMap();
    }

    public void cancel() {
        if (this.ctx == null) {
            this.isRunning = false;
            return;
        }
        synchronized (this.ctx.getCheckpointLock()) {
            this.isRunning = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateTableMap() throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
        for (String str : this.catalog.listDatabases()) {
            if (this.databasePattern.matcher(str).matches()) {
                Iterator<String> it = this.catalog.listTables(str).iterator();
                while (it.hasNext()) {
                    Identifier create = Identifier.create(str, it.next());
                    if (MultiTablesCompactorUtil.shouldCompactTable(create, this.includingPattern, this.excludingPattern) && !this.tablesMap.containsKey(create)) {
                        Table table = this.catalog.getTable(create);
                        if (table instanceof FileStoreTable) {
                            FileStoreTable fileStoreTable = (FileStoreTable) table;
                            if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) {
                                LOG.info(String.format("the bucket mode of %s is unware. ", create.getFullName()) + "currently, the table with unware bucket mode is not support in combined mode.");
                            } else {
                                BucketsTable copy = new BucketsTable(fileStoreTable, this.isStreaming, create.getDatabaseName()).copy(MultiTablesCompactorUtil.compactOptions(this.isStreaming));
                                this.tablesMap.put(create, copy);
                                this.scansMap.put(create, copy.newReadBuilder().newStreamScan());
                            }
                        } else {
                            LOG.error(String.format("Only FileStoreTable supports compact action. The table type is '%s'.", table.getClass().getName()));
                        }
                    }
                }
            }
        }
    }
}
