package org.apache.hudi.connect.writers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.SchemaProvider;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/connect/writers/BufferedConnectWriter.class */
public class BufferedConnectWriter extends AbstractConnectWriter {
    private static final Logger LOG = LogManager.getLogger(BufferedConnectWriter.class);
    private final HoodieEngineContext context;
    private final HoodieJavaWriteClient writeClient;
    private final HoodieWriteConfig config;
    private ExternalSpillableMap<String, HoodieRecord<?>> bufferedRecords;

    public BufferedConnectWriter(HoodieEngineContext hoodieEngineContext, HoodieJavaWriteClient hoodieJavaWriteClient, String str, KafkaConnectConfigs kafkaConnectConfigs, HoodieWriteConfig hoodieWriteConfig, KeyGenerator keyGenerator, SchemaProvider schemaProvider) {
        super(kafkaConnectConfigs, keyGenerator, schemaProvider, str);
        this.context = hoodieEngineContext;
        this.writeClient = hoodieJavaWriteClient;
        this.config = hoodieWriteConfig;
        init();
    }

    private void init() {
        try {
            long maxMemoryPerPartitionMerge = IOUtils.getMaxMemoryPerPartitionMerge(this.context.getTaskContextSupplier(), this.config);
            LOG.info("MaxMemoryPerPartitionMerge => " + maxMemoryPerPartitionMerge);
            this.bufferedRecords = new ExternalSpillableMap<>(Long.valueOf(maxMemoryPerPartitionMerge), this.config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(new Schema.Parser().parse(this.config.getSchema())), this.config.getCommonConfig().getSpillableDiskMapType(), this.config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
        } catch (IOException e) {
            throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", e);
        }
    }

    @Override // org.apache.hudi.connect.writers.AbstractConnectWriter
    public void writeHudiRecord(HoodieRecord<?> hoodieRecord) {
        this.bufferedRecords.put((ExternalSpillableMap<String, HoodieRecord<?>>) hoodieRecord.getRecordKey(), (String) hoodieRecord);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.hudi.connect.writers.AbstractConnectWriter
    public List<WriteStatus> flushRecords() {
        try {
            LOG.info("Number of entries in MemoryBasedMap => " + this.bufferedRecords.getInMemoryMapNumEntries() + ", Total size in bytes of MemoryBasedMap => " + this.bufferedRecords.getCurrentInMemoryMapSize() + ", Number of entries in BitCaskDiskMap => " + this.bufferedRecords.getDiskBasedMapNumEntries() + ", Size of file spilled to disk => " + this.bufferedRecords.getSizeOfFileOnDiskInBytes());
            List arrayList = new ArrayList();
            boolean booleanValue = ((Boolean) Option.ofNullable(this.connectConfigs.getString(HoodieTableConfig.TYPE)).map(str -> {
                return Boolean.valueOf(str.equals(HoodieTableType.MERGE_ON_READ.name()));
            }).orElse(false)).booleanValue();
            if (!this.bufferedRecords.isEmpty()) {
                arrayList = booleanValue ? this.writeClient.upsertPreppedRecords((List) new LinkedList(this.bufferedRecords.values()), this.instantTime) : this.writeClient.bulkInsertPreppedRecords((List) new LinkedList(this.bufferedRecords.values()), this.instantTime, Option.empty());
            }
            this.bufferedRecords.close();
            LOG.info("Flushed hudi records and got writeStatuses: " + arrayList);
            return arrayList;
        } catch (Exception e) {
            throw new HoodieIOException("Write records failed", new IOException(e));
        }
    }
}
