package org.apache.flink.cdc.connectors.postgres.source.reader;

import java.util.List;
import java.util.function.Supplier;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.events.LatestFinishedSplitsNumberEvent;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderWithCommit;
import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitAckEvent;
import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitEvent;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.class */
public class PostgresSourceReader extends IncrementalSourceReaderWithCommit {
    private static final Logger LOG = LoggerFactory.getLogger(PostgresSourceReader.class);
    private volatile boolean isCommitOffset;

    public PostgresSourceReader(FutureCompletingBlockingQueue futureCompletingBlockingQueue, Supplier supplier, RecordEmitter recordEmitter, Configuration configuration, IncrementalSourceReaderContext incrementalSourceReaderContext, SourceConfig sourceConfig, SourceSplitSerializer sourceSplitSerializer, DataSourceDialect dataSourceDialect) {
        super(futureCompletingBlockingQueue, supplier, recordEmitter, configuration, incrementalSourceReaderContext, sourceConfig, sourceSplitSerializer, dataSourceDialect);
        this.isCommitOffset = false;
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (!(sourceEvent instanceof OffsetCommitEvent)) {
            super.handleSourceEvents(sourceEvent);
        } else {
            this.isCommitOffset = ((OffsetCommitEvent) sourceEvent).isCommitOffset();
            this.context.sendSourceEventToCoordinator(new OffsetCommitAckEvent());
        }
    }

    protected void updateStreamSplitFinishedSplitsSize(LatestFinishedSplitsNumberEvent latestFinishedSplitsNumberEvent) {
        super.updateStreamSplitFinishedSplitsSize(latestFinishedSplitsNumberEvent);
        this.isCommitOffset = true;
    }

    public List<SourceSplitBase> snapshotState(long j) {
        List<SourceSplitBase> snapshotState = super.snapshotState(j);
        if (!isCommitOffset()) {
            LOG.debug("Close offset commit of checkpoint {}", Long.valueOf(j));
            this.lastCheckpointOffsets.remove(Long.valueOf(j));
        }
        return snapshotState;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (isCommitOffset()) {
            super.notifyCheckpointComplete(j);
        }
    }

    private boolean isCommitOffset() {
        return !this.sourceConfig.isScanNewlyAddedTableEnabled() || this.isCommitOffset;
    }
}
