package org.apache.seatunnel.connectors.cdc.base.source;

import com.google.common.collect.Sets;
import io.debezium.relational.TableId;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.HybridSplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.SnapshotPhaseState;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader;
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;

/* loaded from: input_file:org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.class */
public abstract class IncrementalSource<T, C extends SourceConfig> implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState>, SupportCoordinate {
    protected ReadonlyConfig readonlyConfig;
    protected SourceConfig.Factory<C> configFactory;
    protected OffsetFactory offsetFactory;
    protected DataSourceDialect<C> dataSourceDialect;
    protected StartupConfig startupConfig;
    protected int incrementalParallelism;
    protected StopConfig stopConfig;
    protected List<CatalogTable> catalogTables;
    protected StopMode stopMode;
    protected DebeziumDeserializationSchema<T> deserializationSchema;
    protected SeaTunnelDataType<SeaTunnelRow> dataType;

    /* JADX INFO: Access modifiers changed from: protected */
    public IncrementalSource(ReadonlyConfig readonlyConfig, SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType, List<CatalogTable> list) {
        this.dataType = seaTunnelDataType;
        this.catalogTables = list;
        this.readonlyConfig = readonlyConfig;
        this.startupConfig = getStartupConfig(this.readonlyConfig);
        this.stopConfig = getStopConfig(this.readonlyConfig);
        this.stopMode = this.stopConfig.getStopMode();
        this.incrementalParallelism = ((Integer) this.readonlyConfig.get(SourceOptions.INCREMENTAL_PARALLELISM)).intValue();
        this.configFactory = createSourceConfigFactory(this.readonlyConfig);
        this.dataSourceDialect = createDataSourceDialect(this.readonlyConfig);
        this.deserializationSchema = createDebeziumDeserializationSchema(this.readonlyConfig);
        this.offsetFactory = createOffsetFactory(this.readonlyConfig);
    }

    protected StartupConfig getStartupConfig(ReadonlyConfig readonlyConfig) {
        return new StartupConfig((StartupMode) readonlyConfig.get(getStartupModeOption()), (String) readonlyConfig.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_FILE), (Long) readonlyConfig.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_POS), (Long) readonlyConfig.get(SourceOptions.STARTUP_TIMESTAMP));
    }

    private StopConfig getStopConfig(ReadonlyConfig readonlyConfig) {
        return new StopConfig((StopMode) readonlyConfig.get(getStopModeOption()), (String) readonlyConfig.get(SourceOptions.STOP_SPECIFIC_OFFSET_FILE), (Long) readonlyConfig.get(SourceOptions.STOP_SPECIFIC_OFFSET_POS), (Long) readonlyConfig.get(SourceOptions.STOP_TIMESTAMP));
    }

    public List<CatalogTable> getProducedCatalogTables() {
        return DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(this.readonlyConfig.get(JdbcSourceOptions.FORMAT)) ? Collections.singletonList(CatalogTableUtil.getCatalogTable("default.default", CompatibleDebeziumJsonDeserializationSchema.DEBEZIUM_DATA_ROW_TYPE)) : this.catalogTables;
    }

    public abstract Option<StartupMode> getStartupModeOption();

    public abstract Option<StopMode> getStopModeOption();

    public abstract SourceConfig.Factory<C> createSourceConfigFactory(ReadonlyConfig readonlyConfig);

    public abstract DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(ReadonlyConfig readonlyConfig);

    public abstract DataSourceDialect<C> createDataSourceDialect(ReadonlyConfig readonlyConfig);

    public abstract OffsetFactory createOffsetFactory(ReadonlyConfig readonlyConfig);

    public Boundedness getBoundedness() {
        return this.stopMode == StopMode.NEVER ? Boundedness.UNBOUNDED : Boundedness.BOUNDED;
    }

    public SourceReader<T, SourceSplitBase> createReader(SourceReader.Context context) throws Exception {
        C create2 = this.configFactory.create2(context.getIndexOfSubtask());
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(2);
        SchemaChangeResolver schemaChangeResolver = this.deserializationSchema.getSchemaChangeResolver();
        return new IncrementalSourceReader(this.dataSourceDialect, linkedBlockingQueue, () -> {
            return new IncrementalSourceSplitReader(context.getIndexOfSubtask(), this.dataSourceDialect, create2, schemaChangeResolver);
        }, createRecordEmitter(create2, context), new SourceReaderOptions(this.readonlyConfig), context, create2, this.deserializationSchema);
    }

    protected RecordEmitter<SourceRecords, T, SourceSplitStateBase> createRecordEmitter(SourceConfig sourceConfig, SourceReader.Context context) {
        return new IncrementalSourceRecordEmitter(this.deserializationSchema, this.offsetFactory, context);
    }

    public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState> createEnumerator(SourceSplitEnumerator.Context<SourceSplitBase> context) throws Exception {
        SplitAssigner hybridSplitAssigner;
        C create2 = this.configFactory.create2(0);
        List<TableId> discoverDataCollections = this.dataSourceDialect.discoverDataCollections(create2);
        SplitAssigner.Context context2 = new SplitAssigner.Context(create2, new HashSet(discoverDataCollections), new HashMap(), new HashMap());
        if (create2.getStartupConfig().getStartupMode() == StartupMode.INITIAL) {
            try {
                hybridSplitAssigner = new HybridSplitAssigner(context2, context.currentParallelism(), this.incrementalParallelism, discoverDataCollections, this.dataSourceDialect.isDataCollectionIdCaseSensitive(create2), this.dataSourceDialect, this.offsetFactory);
            } catch (Exception e) {
                throw new RuntimeException("Failed to discover captured tables for enumerator", e);
            }
        } else {
            hybridSplitAssigner = new IncrementalSplitAssigner(context2, this.incrementalParallelism, this.offsetFactory);
        }
        return new IncrementalSourceEnumerator(context, hybridSplitAssigner);
    }

    public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState> restoreEnumerator(SourceSplitEnumerator.Context<SourceSplitBase> context, PendingSplitsState pendingSplitsState) throws Exception {
        SplitAssigner incrementalSplitAssigner;
        C create2 = this.configFactory.create2(0);
        HashSet hashSet = new HashSet(this.dataSourceDialect.discoverDataCollections(create2));
        if (pendingSplitsState instanceof HybridPendingSplitsState) {
            HybridPendingSplitsState restore = restore(hashSet, (HybridPendingSplitsState) pendingSplitsState);
            SnapshotPhaseState snapshotPhaseState = restore.getSnapshotPhaseState();
            incrementalSplitAssigner = new HybridSplitAssigner(new SplitAssigner.Context(create2, hashSet, snapshotPhaseState.getAssignedSplits(), snapshotPhaseState.getSplitCompletedOffsets()), context.currentParallelism(), this.incrementalParallelism, restore, this.dataSourceDialect, this.offsetFactory);
        } else {
            if (!(pendingSplitsState instanceof IncrementalPhaseState)) {
                throw new UnsupportedOperationException("Unsupported restored PendingSplitsState: " + pendingSplitsState);
            }
            incrementalSplitAssigner = new IncrementalSplitAssigner(new SplitAssigner.Context(create2, hashSet, new HashMap(), new HashMap()), this.incrementalParallelism, this.offsetFactory);
        }
        return new IncrementalSourceEnumerator(context, incrementalSplitAssigner);
    }

    private HybridPendingSplitsState restore(Set<TableId> set, HybridPendingSplitsState hybridPendingSplitsState) {
        SnapshotPhaseState snapshotPhaseState = hybridPendingSplitsState.getSnapshotPhaseState();
        Set set2 = (Set) Stream.concat(snapshotPhaseState.getAlreadyProcessedTables().stream(), snapshotPhaseState.getRemainingTables().stream()).collect(Collectors.toSet());
        Sets.SetView difference = Sets.difference(set, set2);
        Sets.SetView difference2 = Sets.difference(set2, set);
        snapshotPhaseState.getRemainingTables().addAll(difference);
        snapshotPhaseState.getRemainingTables().removeAll(difference2);
        snapshotPhaseState.getAlreadyProcessedTables().removeAll(difference2);
        HashSet hashSet = new HashSet();
        Iterator<SnapshotSplit> it = snapshotPhaseState.getRemainingSplits().iterator();
        while (it.hasNext()) {
            SnapshotSplit next = it.next();
            if (difference2.contains(next.getTableId())) {
                it.remove();
                hashSet.add(next.splitId());
            }
        }
        for (Map.Entry<String, SnapshotSplit> entry : snapshotPhaseState.getAssignedSplits().entrySet()) {
            if (difference2.contains(entry.getValue().getTableId())) {
                hashSet.add(entry.getKey());
            }
        }
        hashSet.forEach(str -> {
            snapshotPhaseState.getAssignedSplits().remove(str);
            snapshotPhaseState.getSplitCompletedOffsets().remove(str);
        });
        return (!(snapshotPhaseState.getRemainingTables().isEmpty() && snapshotPhaseState.getRemainingSplits().isEmpty()) && snapshotPhaseState.isAssignerCompleted()) ? new HybridPendingSplitsState(new SnapshotPhaseState(snapshotPhaseState.getAlreadyProcessedTables(), snapshotPhaseState.getRemainingSplits(), snapshotPhaseState.getAssignedSplits(), snapshotPhaseState.getSplitCompletedOffsets(), false, snapshotPhaseState.getRemainingTables(), snapshotPhaseState.isTableIdCaseSensitive(), snapshotPhaseState.isRemainingTablesCheckpointed()), hybridPendingSplitsState.getIncrementalPhaseState()) : hybridPendingSplitsState;
    }

    public IncrementalSource() {
    }

    public /* bridge */ /* synthetic */ SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context context, Serializable serializable) throws Exception {
        return restoreEnumerator((SourceSplitEnumerator.Context<SourceSplitBase>) context, (PendingSplitsState) serializable);
    }
}
