package org.apache.flink.cdc.connectors.postgres.source.enumerator;

import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus;
import org.apache.flink.cdc.connectors.base.source.assigner.SplitAssigner;
import org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitAckEvent;
import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitEvent;
import org.apache.flink.util.FlinkRuntimeException;

@Internal
/* loaded from: input_file:org/apache/flink/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.class */
public class PostgresSourceEnumerator extends IncrementalSourceEnumerator {
    private final PostgresDialect postgresDialect;
    private final PostgresSourceConfig sourceConfig;
    private volatile boolean receiveOffsetCommitAck;

    public PostgresSourceEnumerator(SplitEnumeratorContext<SourceSplitBase> splitEnumeratorContext, PostgresSourceConfig postgresSourceConfig, SplitAssigner splitAssigner, PostgresDialect postgresDialect, Boundedness boundedness) {
        super(splitEnumeratorContext, postgresSourceConfig, splitAssigner, boundedness);
        this.receiveOffsetCommitAck = false;
        this.postgresDialect = postgresDialect;
        this.sourceConfig = postgresSourceConfig;
    }

    public void start() {
        createSlotForGlobalStreamSplit();
        super.start();
    }

    protected void assignSplits() {
        if (!this.sourceConfig.isScanNewlyAddedTableEnabled() || this.streamSplitTaskId == null || this.receiveOffsetCommitAck || !AssignerStatus.isNewlyAddedAssigning(this.splitAssigner.getAssignerStatus())) {
            super.assignSplits();
        }
    }

    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        if (!(sourceEvent instanceof OffsetCommitAckEvent)) {
            super.handleSourceEvent(i, sourceEvent);
        } else {
            if (this.streamSplitTaskId == null || this.streamSplitTaskId.intValue() != i) {
                throw new RuntimeException("Receive SyncAssignStatusAck from wrong subtask");
            }
            this.receiveOffsetCommitAck = true;
        }
    }

    protected void syncWithReaders(int[] iArr, Throwable th) {
        super.syncWithReaders(iArr, th);
        if (this.receiveOffsetCommitAck || !this.sourceConfig.isScanNewlyAddedTableEnabled() || this.streamSplitTaskId == null) {
            return;
        }
        AssignerStatus assignerStatus = this.splitAssigner.getAssignerStatus();
        this.context.sendEventToSourceReader(this.streamSplitTaskId.intValue(), new OffsetCommitEvent((AssignerStatus.isNewlyAddedAssigning(assignerStatus) || AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)) ? false : true));
    }

    private void createSlotForGlobalStreamSplit() {
        try {
            PostgresConnection openJdbcConnection = this.postgresDialect.openJdbcConnection();
            Throwable th = null;
            try {
                if (openJdbcConnection.getReplicationSlotState(this.postgresDialect.getSlotName(), this.postgresDialect.getPluginName()) != null) {
                    if (openJdbcConnection != null) {
                        if (0 == 0) {
                            openJdbcConnection.close();
                            return;
                        }
                        try {
                            openJdbcConnection.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                PostgresReplicationConnection openPostgresReplicationConnection = this.postgresDialect.openPostgresReplicationConnection(openJdbcConnection);
                openPostgresReplicationConnection.createReplicationSlot();
                openPostgresReplicationConnection.close(false);
                if (openJdbcConnection != null) {
                    if (0 != 0) {
                        try {
                            openJdbcConnection.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        openJdbcConnection.close();
                    }
                }
                return;
            } finally {
            }
        } catch (Throwable th4) {
            throw new FlinkRuntimeException(String.format("Fail to get or create slot for global stream split, the slot name is %s. Due to: ", this.postgresDialect.getSlotName()), th4);
        }
        throw new FlinkRuntimeException(String.format("Fail to get or create slot for global stream split, the slot name is %s. Due to: ", this.postgresDialect.getSlotName()), th4);
    }
}
