package org.apache.seatunnel.connectors.cdc.base.source;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.ChangeStreamTableSourceFactory;
import org.apache.seatunnel.api.table.factory.ChangeStreamTableSourceState;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/cdc/base/source/BaseChangeStreamTableSourceFactory.class */
public abstract class BaseChangeStreamTableSourceFactory implements ChangeStreamTableSourceFactory {
    private static final Logger log = LoggerFactory.getLogger(BaseChangeStreamTableSourceFactory.class);

    public <T, SplitT extends SourceSplit, StateT extends Serializable> TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext tableSourceFactoryContext) {
        return restoreSource(tableSourceFactoryContext, Collections.emptyList());
    }

    public <T, SplitT extends SourceSplit, StateT extends Serializable> TableSource<T, SplitT, StateT> restoreSource(TableSourceFactoryContext tableSourceFactoryContext, ChangeStreamTableSourceState<StateT, SplitT> changeStreamTableSourceState) {
        return restoreSource(tableSourceFactoryContext, getRestoreTableStruct(changeStreamTableSourceState));
    }

    public abstract <T, SplitT extends SourceSplit, StateT extends Serializable> TableSource<T, SplitT, StateT> restoreSource(TableSourceFactoryContext tableSourceFactoryContext, List<CatalogTable> list);

    protected <SplitT extends SourceSplit, StateT extends Serializable> List<CatalogTable> getRestoreTableStruct(ChangeStreamTableSourceState<StateT, SplitT> changeStreamTableSourceState) {
        List list = (List) changeStreamTableSourceState.getSplits().stream().flatMap((v0) -> {
            return v0.stream();
        }).filter(sourceSplit -> {
            return sourceSplit != null;
        }).map(sourceSplit2 -> {
            return (SourceSplitBase) SourceSplitBase.class.cast(sourceSplit2);
        }).filter(sourceSplitBase -> {
            return sourceSplitBase.isIncrementalSplit();
        }).map(sourceSplitBase2 -> {
            return sourceSplitBase2.asIncrementalSplit();
        }).collect(Collectors.toList());
        if (list.size() > 1) {
            throw new UnsupportedOperationException("Multiple incremental splits are not supported");
        }
        if (list.size() == 1) {
            IncrementalSplit incrementalSplit = (IncrementalSplit) list.get(0);
            if (incrementalSplit.getCheckpointTables() != null) {
                List<CatalogTable> checkpointTables = incrementalSplit.getCheckpointTables();
                log.info("Restore source using checkpoint tables: {}", checkpointTables);
                return checkpointTables;
            }
            if (incrementalSplit.getCheckpointDataType() != null) {
                List<CatalogTable> convertDataTypeToCatalogTables = CatalogTableUtil.convertDataTypeToCatalogTables(incrementalSplit.getCheckpointDataType(), "default.default");
                log.info("Restore source using checkpoint tables: {}", convertDataTypeToCatalogTables);
                return convertDataTypeToCatalogTables;
            }
        }
        log.info("Restore source using checkpoint tables is empty");
        return Collections.emptyList();
    }

    protected List<CatalogTable> mergeTableStruct(List<CatalogTable> list, List<CatalogTable> list2) {
        if (list2.isEmpty()) {
            return list;
        }
        Map map = (Map) list2.stream().collect(Collectors.toMap((v0) -> {
            return v0.getTablePath();
        }, catalogTable -> {
            return catalogTable;
        }));
        List<CatalogTable> list3 = (List) list.stream().map(catalogTable2 -> {
            return (CatalogTable) map.getOrDefault(catalogTable2.getTablePath(), catalogTable2);
        }).collect(Collectors.toList());
        log.info("Merge db table struct with checkpoint table struct: {}", list3);
        return list3;
    }
}
