package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit;

import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.TransactionOptions;
import com.mongodb.WriteConcern;
import com.mongodb.client.ClientSession;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/MongodbSinkAggregatedCommitter.class */
public class MongodbSinkAggregatedCommitter implements SinkAggregatedCommitter<MongodbCommitInfo, MongodbAggregatedCommitInfo> {
    private static final Logger log = LoggerFactory.getLogger(MongodbSinkAggregatedCommitter.class);
    private static final long waitingTime = 5000;
    private static final long TRANSACTION_TIMEOUT_MS = 60000;
    private final boolean enableUpsert;
    private final String[] upsertKeys;
    private final MongodbClientProvider collectionProvider;
    private ClientSession clientSession;
    private MongoClient client;

    public MongodbSinkAggregatedCommitter(MongodbWriterOptions mongodbWriterOptions) {
        this.enableUpsert = mongodbWriterOptions.isUpsertEnable();
        this.upsertKeys = mongodbWriterOptions.getPrimaryKey();
        this.collectionProvider = MongodbCollectionProvider.builder().connectionString(mongodbWriterOptions.getConnectString()).database(mongodbWriterOptions.getDatabase()).collection(mongodbWriterOptions.getCollection()).build();
    }

    public List<MongodbAggregatedCommitInfo> commit(List<MongodbAggregatedCommitInfo> list) {
        return (List) list.stream().map(this::processAggregatedCommitInfo).filter(mongodbAggregatedCommitInfo -> {
            return !mongodbAggregatedCommitInfo.getCommitInfos().isEmpty();
        }).collect(Collectors.toList());
    }

    private MongodbAggregatedCommitInfo processAggregatedCommitInfo(MongodbAggregatedCommitInfo mongodbAggregatedCommitInfo) {
        return new MongodbAggregatedCommitInfo((List) mongodbAggregatedCommitInfo.getCommitInfos().stream().flatMap(this::processCommitInfo).filter(list -> {
            return !list.isEmpty();
        }).map(MongodbCommitInfo::new).collect(Collectors.toList()));
    }

    private Stream<List<DocumentBulk>> processCommitInfo(MongodbCommitInfo mongodbCommitInfo) {
        this.client = this.collectionProvider.getClient();
        this.clientSession = this.client.startSession();
        MongoCollection<BsonDocument> defaultCollection = this.collectionProvider.getDefaultCollection();
        return Stream.of(mongodbCommitInfo.getDocumentBulks().stream().filter(documentBulk -> {
            return !documentBulk.getDocuments().isEmpty();
        }).filter(documentBulk2 -> {
            try {
                log.info("Inserted {} documents into collection {}.", Integer.valueOf(((Integer) this.clientSession.withTransaction(this.enableUpsert ? new CommittableUpsertTransaction(defaultCollection, documentBulk2.getDocuments(), this.upsertKeys) : new CommittableTransaction(defaultCollection, documentBulk2.getDocuments()), TransactionOptions.builder().readPreference(ReadPreference.primary()).readConcern(ReadConcern.LOCAL).writeConcern(WriteConcern.MAJORITY).build())).intValue()), defaultCollection.getNamespace());
                return false;
            } catch (Exception e) {
                log.error("Failed to commit with Mongo transaction.", e);
                return true;
            }
        }).collect(Collectors.toList()));
    }

    public MongodbAggregatedCommitInfo combine(List<MongodbCommitInfo> list) {
        return new MongodbAggregatedCommitInfo(list);
    }

    public void abort(List<MongodbAggregatedCommitInfo> list) {
    }

    public void close() {
        long currentTimeMillis = System.currentTimeMillis() + TRANSACTION_TIMEOUT_MS;
        while (this.clientSession.hasActiveTransaction() && System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(waitingTime);
        }
        if (this.clientSession != null) {
            this.clientSession.close();
        }
        if (this.client != null) {
            this.client.close();
        }
    }

    /* renamed from: combine, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object m191combine(List list) {
        return combine((List<MongodbCommitInfo>) list);
    }
}
