package org.apache.hudi.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/StreamWriteFunction.class */
public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O> implements CheckpointedFunction {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class);
    private transient Map<String, DataBucket> buckets;
    private final Configuration config;
    private int taskID;
    private transient HoodieFlinkWriteClient writeClient;
    private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
    private volatile String currentInstant;
    private transient OperatorEventGateway eventGateway;
    private transient String actionType;
    private transient TotalSizeTracer tracer;
    private volatile boolean confirming = false;
    private transient ListState<WriteMetadataEvent> writeMetadataState;
    private List<WriteStatus> writeStatuses;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/sink/StreamWriteFunction$BufferSizeDetector.class */
    public static class BufferSizeDetector {
        private static final int DENOMINATOR = 100;
        private final double batchSizeBytes;
        private final Random random = new Random(47);
        private long lastRecordSize = -1;
        private long totalSize = 0;

        BufferSizeDetector(double d) {
            this.batchSizeBytes = d * 1024.0d * 1024.0d;
        }

        boolean detect(Object obj) {
            if (this.lastRecordSize == -1 || sampling()) {
                this.lastRecordSize = ObjectSizeCalculator.getObjectSize(obj);
            }
            this.totalSize += this.lastRecordSize;
            return ((double) this.totalSize) > this.batchSizeBytes;
        }

        boolean sampling() {
            return this.random.nextInt(100) == 1;
        }

        void reset() {
            this.lastRecordSize = -1L;
            this.totalSize = 0L;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/sink/StreamWriteFunction$DataBucket.class */
    public static class DataBucket {
        private final List<DataItem> records;
        private final BufferSizeDetector detector;
        private final String partitionPath;
        private final String fileID;

        private DataBucket(Double d, HoodieRecord<?> hoodieRecord) {
            this.records = new ArrayList();
            this.detector = new BufferSizeDetector(d.doubleValue());
            this.partitionPath = hoodieRecord.getPartitionPath();
            this.fileID = hoodieRecord.getCurrentLocation().getFileId();
        }

        public List<HoodieRecord> writeBuffer() {
            return (List) this.records.stream().map(dataItem -> {
                return dataItem.toHoodieRecord(this.partitionPath);
            }).collect(Collectors.toList());
        }

        public void preWrite(List<HoodieRecord> list) {
            HoodieRecord hoodieRecord = list.get(0);
            HoodieRecord hoodieRecord2 = new HoodieRecord(hoodieRecord.getKey(), hoodieRecord.getData(), hoodieRecord.getOperation());
            hoodieRecord2.setCurrentLocation(new HoodieRecordLocation(hoodieRecord.getCurrentLocation().getInstantTime(), this.fileID));
            list.set(0, hoodieRecord2);
        }

        public void reset() {
            this.records.clear();
            this.detector.reset();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/sink/StreamWriteFunction$DataItem.class */
    public static class DataItem {
        private final String key;
        private final String instant;
        private final HoodieRecordPayload<?> data;
        private final HoodieOperation operation;

        private DataItem(String str, String str2, HoodieRecordPayload<?> hoodieRecordPayload, HoodieOperation hoodieOperation) {
            this.key = str;
            this.instant = str2;
            this.data = hoodieRecordPayload;
            this.operation = hoodieOperation;
        }

        /* JADX WARN: Type inference failed for: r4v1, types: [org.apache.hudi.common.model.HoodieRecordPayload] */
        public static DataItem fromHoodieRecord(HoodieRecord<?> hoodieRecord) {
            return new DataItem(hoodieRecord.getRecordKey(), hoodieRecord.getCurrentLocation().getInstantTime(), hoodieRecord.getData(), hoodieRecord.getOperation());
        }

        public HoodieRecord<?> toHoodieRecord(String str) {
            HoodieRecord<?> hoodieRecord = new HoodieRecord<>(new HoodieKey(this.key, str), this.data, this.operation);
            hoodieRecord.setCurrentLocation(new HoodieRecordLocation(this.instant, null));
            return hoodieRecord;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/sink/StreamWriteFunction$TotalSizeTracer.class */
    public static class TotalSizeTracer {
        private long bufferSize = 0;
        private final double maxBufferSize;

        TotalSizeTracer(Configuration configuration) {
            this.maxBufferSize = ((configuration.getDouble(FlinkOptions.WRITE_TASK_MAX_SIZE) - 100) - configuration.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY)) * 1024.0d * 1024.0d;
            ValidationUtils.checkState(this.maxBufferSize > 0.0d, String.format("'%s' should be at least greater than '%s' plus merge reader memory(constant 100MB now)", FlinkOptions.WRITE_TASK_MAX_SIZE.key(), FlinkOptions.WRITE_MERGE_MAX_MEMORY.key()));
        }

        boolean trace(long j) {
            this.bufferSize += j;
            return ((double) this.bufferSize) > this.maxBufferSize;
        }

        void countDown(long j) {
            this.bufferSize -= j;
        }

        public void reset() {
            this.bufferSize = 0L;
        }
    }

    public StreamWriteFunction(Configuration configuration) {
        this.config = configuration;
    }

    public void open(Configuration configuration) throws IOException {
        this.tracer = new TotalSizeTracer(this.config);
        initBuffer();
        initWriteFunction();
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.taskID = getRuntimeContext().getIndexOfThisSubtask();
        this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
        this.actionType = CommitUtils.getCommitActionType(WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)), HoodieTableType.valueOf(this.config.getString(FlinkOptions.TABLE_TYPE)));
        this.writeStatuses = new ArrayList();
        this.writeMetadataState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("write-metadata-state", TypeInformation.of(WriteMetadataEvent.class)));
        this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);
        if (functionInitializationContext.isRestored()) {
            restoreWriteMetadata();
        } else {
            sendBootstrapEvent();
        }
        this.confirming = true;
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        flushRemaining(false);
        reloadWriteMetaState();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(I i, KeyedProcessFunction<K, I, O>.Context context, Collector<O> collector) {
        bufferRecord((HoodieRecord) i);
    }

    public void close() {
        if (this.writeClient != null) {
            this.writeClient.cleanHandlesGracefully();
            this.writeClient.close();
        }
    }

    public void endInput() {
        flushRemaining(true);
        this.writeClient.cleanHandles();
        this.writeStatuses.clear();
    }

    @VisibleForTesting
    public Map<String, List<HoodieRecord>> getDataBuffer() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, DataBucket> entry : this.buckets.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().writeBuffer());
        }
        return hashMap;
    }

    @VisibleForTesting
    public HoodieFlinkWriteClient getWriteClient() {
        return this.writeClient;
    }

    @VisibleForTesting
    public boolean isConfirming() {
        return this.confirming;
    }

    public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
        this.eventGateway = operatorEventGateway;
    }

    private void initBuffer() {
        this.buckets = new LinkedHashMap();
    }

    private void initWriteFunction() {
        String str = (String) this.config.get(FlinkOptions.OPERATION);
        switch (WriteOperationType.fromValue(str)) {
            case INSERT:
                this.writeFunction = (list, str2) -> {
                    return this.writeClient.insert(list, str2);
                };
                return;
            case UPSERT:
                this.writeFunction = (list2, str3) -> {
                    return this.writeClient.upsert(list2, str3);
                };
                return;
            case INSERT_OVERWRITE:
                this.writeFunction = (list3, str4) -> {
                    return this.writeClient.insertOverwrite(list3, str4);
                };
                return;
            case INSERT_OVERWRITE_TABLE:
                this.writeFunction = (list4, str5) -> {
                    return this.writeClient.insertOverwriteTable(list4, str5);
                };
                return;
            default:
                throw new RuntimeException("Unsupported write operation : " + str);
        }
    }

    private void restoreWriteMetadata() throws Exception {
        String lastPendingInstant = this.writeClient.getLastPendingInstant(this.actionType);
        boolean z = false;
        for (WriteMetadataEvent writeMetadataEvent : (Iterable) this.writeMetadataState.get()) {
            if (Objects.equals(lastPendingInstant, writeMetadataEvent.getInstantTime())) {
                this.eventGateway.sendEventToCoordinator(writeMetadataEvent);
                LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", Integer.valueOf(this.taskID));
                z = true;
            }
        }
        if (z) {
            return;
        }
        sendBootstrapEvent();
    }

    private void sendBootstrapEvent() {
        this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(this.taskID));
        LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", Integer.valueOf(this.taskID));
    }

    private void reloadWriteMetaState() throws Exception {
        this.writeMetadataState.clear();
        this.writeMetadataState.add(WriteMetadataEvent.builder().taskID(this.taskID).instantTime(this.currentInstant).writeStatus(new ArrayList(this.writeStatuses)).bootstrap(true).build());
        this.writeStatuses.clear();
    }

    public void handleOperatorEvent(OperatorEvent operatorEvent) {
        ValidationUtils.checkArgument(operatorEvent instanceof CommitAckEvent, "The write function can only handle CommitAckEvent");
        this.confirming = false;
    }

    private String getBucketID(HoodieRecord<?> hoodieRecord) {
        return StreamerUtil.generateBucketKey(hoodieRecord.getPartitionPath(), hoodieRecord.getCurrentLocation().getFileId());
    }

    private void bufferRecord(HoodieRecord<?> hoodieRecord) {
        DataBucket computeIfAbsent = this.buckets.computeIfAbsent(getBucketID(hoodieRecord), str -> {
            return new DataBucket(Double.valueOf(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)), hoodieRecord);
        });
        DataItem fromHoodieRecord = DataItem.fromHoodieRecord(hoodieRecord);
        boolean detect = computeIfAbsent.detector.detect(fromHoodieRecord);
        boolean trace = this.tracer.trace(computeIfAbsent.detector.lastRecordSize);
        if (detect) {
            if (flushBucket(computeIfAbsent)) {
                this.tracer.countDown(computeIfAbsent.detector.totalSize);
                computeIfAbsent.reset();
            }
        } else if (trace) {
            DataBucket dataBucket = (DataBucket) ((List) this.buckets.values().stream().sorted((dataBucket2, dataBucket3) -> {
                return Long.compare(dataBucket3.detector.totalSize, dataBucket2.detector.totalSize);
            }).collect(Collectors.toList())).get(0);
            if (flushBucket(dataBucket)) {
                this.tracer.countDown(dataBucket.detector.totalSize);
                dataBucket.reset();
            } else {
                LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", Double.valueOf(this.tracer.maxBufferSize));
            }
        }
        computeIfAbsent.records.add(fromHoodieRecord);
    }

    private boolean hasData() {
        return this.buckets.size() > 0 && this.buckets.values().stream().anyMatch(dataBucket -> {
            return dataBucket.records.size() > 0;
        });
    }

    private String instantToWrite(boolean z) {
        String lastPendingInstant = this.writeClient.getLastPendingInstant(this.actionType);
        TimeWait build = TimeWait.builder().timeout(this.config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)).action("instant initialize").build();
        while (this.confirming) {
            if (lastPendingInstant == null || (lastPendingInstant.equals(this.currentInstant) && z)) {
                build.waitFor();
                lastPendingInstant = this.writeClient.getLastPendingInstant(this.actionType);
            } else {
                this.confirming = false;
            }
        }
        return lastPendingInstant;
    }

    private boolean flushBucket(DataBucket dataBucket) {
        String instantToWrite = instantToWrite(true);
        if (instantToWrite == null) {
            LOG.info("No inflight instant when flushing data, skip.");
            return false;
        }
        List<HoodieRecord> writeBuffer = dataBucket.writeBuffer();
        ValidationUtils.checkState(writeBuffer.size() > 0, "Data bucket to flush has no buffering records");
        if (this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
            writeBuffer = FlinkWriteHelper.newInstance().deduplicateRecords((List) writeBuffer, (HoodieIndex) null, -1);
        }
        dataBucket.preWrite(writeBuffer);
        ArrayList arrayList = new ArrayList(this.writeFunction.apply(writeBuffer, instantToWrite));
        writeBuffer.clear();
        this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.builder().taskID(this.taskID).instantTime(instantToWrite).writeStatus(arrayList).lastBatch(false).endInput(false).build());
        this.writeStatuses.addAll(arrayList);
        return true;
    }

    private void flushRemaining(boolean z) {
        List<WriteStatus> emptyList;
        this.currentInstant = instantToWrite(hasData());
        if (this.currentInstant == null) {
            throw new HoodieException("No inflight instant when flushing data!");
        }
        if (this.buckets.size() > 0) {
            emptyList = new ArrayList();
            this.buckets.values().forEach(dataBucket -> {
                List<HoodieRecord> writeBuffer = dataBucket.writeBuffer();
                if (writeBuffer.size() > 0) {
                    if (this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
                        writeBuffer = FlinkWriteHelper.newInstance().deduplicateRecords((List) writeBuffer, (HoodieIndex) null, -1);
                    }
                    dataBucket.preWrite(writeBuffer);
                    emptyList.addAll(this.writeFunction.apply(writeBuffer, this.currentInstant));
                    writeBuffer.clear();
                    dataBucket.reset();
                }
            });
        } else {
            LOG.info("No data to write in subtask [{}] for instant [{}]", Integer.valueOf(this.taskID), this.currentInstant);
            emptyList = Collections.emptyList();
        }
        this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.builder().taskID(this.taskID).instantTime(this.currentInstant).writeStatus(emptyList).lastBatch(true).endInput(z).build());
        this.buckets.clear();
        this.tracer.reset();
        this.writeClient.cleanHandles();
        this.writeStatuses.addAll(emptyList);
        this.confirming = true;
    }
}
