package org.apache.shardingsphere.data.pipeline.postgresql.ingest;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLTimestampUtils;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.TestDecodingPlugin;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.PGReplicationStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.class */
public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(PostgreSQLWALDumper.class);
    private final DumperConfiguration dumperConfig;
    private final AtomicReference<WALPosition> walPosition;
    private final PipelineChannel channel;
    private final WALEventConverter walEventConverter;
    private final PostgreSQLLogicalReplication logicalReplication;
    private final boolean decodeWithTX;
    private List<AbstractRowEvent> rowEvents = new LinkedList();

    public PostgreSQLWALDumper(DumperConfiguration dumperConfiguration, IngestPosition ingestPosition, PipelineChannel pipelineChannel, PipelineTableMetaDataLoader pipelineTableMetaDataLoader) {
        ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfiguration.getDataSourceConfig().getClass()), () -> {
            return new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration");
        });
        this.dumperConfig = dumperConfiguration;
        this.walPosition = new AtomicReference<>((WALPosition) ingestPosition);
        this.channel = pipelineChannel;
        this.walEventConverter = new WALEventConverter(dumperConfiguration, pipelineTableMetaDataLoader);
        this.logicalReplication = new PostgreSQLLogicalReplication();
        this.decodeWithTX = dumperConfiguration.isDecodeWithTX();
    }

    protected void runBlocking() {
        try {
            AtomicInteger atomicInteger = new AtomicInteger();
            while (isRunning()) {
                try {
                    dump();
                    break;
                } catch (SQLException e) {
                    int incrementAndGet = atomicInteger.incrementAndGet();
                    log.error("Connect failed, reconnect times={}", Integer.valueOf(incrementAndGet), e);
                    if (isRunning()) {
                        Thread.sleep(5000L);
                    }
                    if (incrementAndGet >= 5) {
                        throw new IngestException(e);
                    }
                }
            }
        } catch (InterruptedException e2) {
            throw e2;
        }
    }

    private void dump() throws SQLException {
        try {
            Connection createConnection = this.logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) this.dumperConfig.getDataSourceConfig());
            try {
                PGReplicationStream createReplicationStream = this.logicalReplication.createReplicationStream(createConnection, PostgreSQLPositionInitializer.getUniqueSlotName(createConnection, this.dumperConfig.getJobId()), this.walPosition.get().getLogSequenceNumber());
                try {
                    TestDecodingPlugin testDecodingPlugin = new TestDecodingPlugin(new PostgreSQLTimestampUtils(((PgConnection) createConnection.unwrap(PgConnection.class)).getTimestampUtils()));
                    while (isRunning()) {
                        ByteBuffer readPending = createReplicationStream.readPending();
                        if (null == readPending) {
                            Thread.sleep(10L);
                        } else {
                            AbstractWALEvent decode = testDecodingPlugin.decode(readPending, new PostgreSQLLogSequenceNumber(createReplicationStream.getLastReceiveLSN()));
                            if (this.decodeWithTX) {
                                processEventWithTX(decode);
                            } else {
                                processEventIgnoreTX(decode);
                            }
                            this.walPosition.set(new WALPosition(decode.getLogSequenceNumber()));
                        }
                    }
                    if (createReplicationStream != null) {
                        createReplicationStream.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createReplicationStream != null) {
                        try {
                            createReplicationStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (InterruptedException e) {
            throw e;
        }
    }

    private void processEventWithTX(AbstractWALEvent abstractWALEvent) {
        if (abstractWALEvent instanceof BeginTXEvent) {
            this.rowEvents = new ArrayList();
            return;
        }
        if (abstractWALEvent instanceof AbstractRowEvent) {
            this.rowEvents.add((AbstractRowEvent) abstractWALEvent);
            return;
        }
        if (abstractWALEvent instanceof CommitTXEvent) {
            LinkedList linkedList = new LinkedList();
            Iterator<AbstractRowEvent> it = this.rowEvents.iterator();
            while (it.hasNext()) {
                linkedList.add(this.walEventConverter.convert(it.next()));
            }
            linkedList.add(this.walEventConverter.convert(abstractWALEvent));
            this.channel.pushRecords(linkedList);
        }
    }

    private void processEventIgnoreTX(AbstractWALEvent abstractWALEvent) {
        if (abstractWALEvent instanceof BeginTXEvent) {
            return;
        }
        this.channel.pushRecords(Collections.singletonList(this.walEventConverter.convert(abstractWALEvent)));
    }

    protected void doStop() {
    }
}
