/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamWriteFunction<I>
extends AbstractStreamWriteFunction<I> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class);
    private transient Map<String, DataBucket> buckets;
    private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
    private transient TotalSizeTracer tracer;

    public StreamWriteFunction(Configuration config) {
        super(config);
    }

    public void open(Configuration parameters) throws IOException {
        this.tracer = new TotalSizeTracer(this.config);
        this.initBuffer();
        this.initWriteFunction();
    }

    @Override
    public void snapshotState() {
        this.flushRemaining(false);
    }

    public void processElement(I value, ProcessFunction.Context ctx, Collector<Object> out) throws Exception {
        this.bufferRecord((HoodieRecord)value);
    }

    public void close() {
        if (this.writeClient != null) {
            this.writeClient.cleanHandlesGracefully();
            this.writeClient.close();
        }
    }

    @Override
    public void endInput() {
        this.flushRemaining(true);
        this.writeClient.cleanHandles();
        this.writeStatuses.clear();
    }

    @VisibleForTesting
    public Map<String, List<HoodieRecord>> getDataBuffer() {
        HashMap<String, List<HoodieRecord>> ret = new HashMap<String, List<HoodieRecord>>();
        for (Map.Entry<String, DataBucket> entry : this.buckets.entrySet()) {
            ret.put(entry.getKey(), entry.getValue().writeBuffer());
        }
        return ret;
    }

    private void initBuffer() {
        this.buckets = new LinkedHashMap<String, DataBucket>();
    }

    private void initWriteFunction() {
        String writeOperation = (String)this.config.get(FlinkOptions.OPERATION);
        switch (WriteOperationType.fromValue((String)writeOperation)) {
            case INSERT: {
                this.writeFunction = (records, instantTime) -> this.writeClient.insert(records, instantTime);
                break;
            }
            case UPSERT: {
                this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime);
                break;
            }
            case INSERT_OVERWRITE: {
                this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwrite(records, instantTime);
                break;
            }
            case INSERT_OVERWRITE_TABLE: {
                this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwriteTable(records, instantTime);
                break;
            }
            default: {
                throw new RuntimeException("Unsupported write operation : " + writeOperation);
            }
        }
    }

    private String getBucketID(HoodieRecord<?> record) {
        String fileId = record.getCurrentLocation().getFileId();
        return StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
    }

    protected void bufferRecord(HoodieRecord<?> value) {
        String bucketID = this.getBucketID(value);
        DataBucket bucket = this.buckets.computeIfAbsent(bucketID, k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
        DataItem item = DataItem.fromHoodieRecord(value);
        bucket.records.add(item);
        boolean flushBucket = bucket.detector.detect(item);
        boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
        if (flushBucket) {
            if (this.flushBucket(bucket)) {
                this.tracer.countDown(bucket.detector.totalSize);
                bucket.reset();
            }
        } else if (flushBuffer) {
            List sortedBuckets = this.buckets.values().stream().sorted((b1, b2) -> Long.compare(((DataBucket)b2).detector.totalSize, ((DataBucket)b1).detector.totalSize)).collect(Collectors.toList());
            DataBucket bucketToFlush = (DataBucket)sortedBuckets.get(0);
            if (this.flushBucket(bucketToFlush)) {
                this.tracer.countDown(bucketToFlush.detector.totalSize);
                bucketToFlush.reset();
            } else {
                LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", (Object)this.tracer.maxBufferSize);
            }
        }
    }

    private boolean hasData() {
        return this.buckets.size() > 0 && this.buckets.values().stream().anyMatch(bucket -> ((DataBucket)bucket).records.size() > 0);
    }

    private boolean flushBucket(DataBucket bucket) {
        String instant = this.instantToWrite(true);
        if (instant == null) {
            LOG.info("No inflight instant when flushing data, skip.");
            return false;
        }
        List records = bucket.writeBuffer();
        ValidationUtils.checkState((records.size() > 0 ? 1 : 0) != 0, (String)"Data bucket to flush has no buffering records");
        if (this.config.getBoolean(FlinkOptions.PRE_COMBINE)) {
            records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex)null, -1);
        }
        bucket.preWrite(records);
        ArrayList<WriteStatus> writeStatus = new ArrayList<WriteStatus>((Collection)this.writeFunction.apply(records, instant));
        records.clear();
        WriteMetadataEvent event = WriteMetadataEvent.builder().taskID(this.taskID).instantTime(instant).writeStatus(writeStatus).lastBatch(false).endInput(false).build();
        this.eventGateway.sendEventToCoordinator((OperatorEvent)event);
        this.writeStatuses.addAll(writeStatus);
        return true;
    }

    private void flushRemaining(boolean endInput) {
        ArrayList<WriteStatus> writeStatus;
        this.currentInstant = this.instantToWrite(this.hasData());
        if (this.currentInstant == null) {
            throw new HoodieException("No inflight instant when flushing data!");
        }
        if (this.buckets.size() > 0) {
            writeStatus = new ArrayList();
            this.buckets.values().forEach(bucket -> {
                List records = bucket.writeBuffer();
                if (records.size() > 0) {
                    if (this.config.getBoolean(FlinkOptions.PRE_COMBINE)) {
                        records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex)null, -1);
                    }
                    bucket.preWrite(records);
                    writeStatus.addAll((Collection)this.writeFunction.apply(records, this.currentInstant));
                    records.clear();
                    bucket.reset();
                }
            });
        } else {
            LOG.info("No data to write in subtask [{}] for instant [{}]", (Object)this.taskID, (Object)this.currentInstant);
            writeStatus = Collections.emptyList();
        }
        WriteMetadataEvent event = WriteMetadataEvent.builder().taskID(this.taskID).instantTime(this.currentInstant).writeStatus(writeStatus).lastBatch(true).endInput(endInput).build();
        this.eventGateway.sendEventToCoordinator((OperatorEvent)event);
        this.buckets.clear();
        this.tracer.reset();
        this.writeClient.cleanHandles();
        this.writeStatuses.addAll(writeStatus);
        this.confirming = true;
    }

    private static class TotalSizeTracer {
        private long bufferSize = 0L;
        private final double maxBufferSize;

        TotalSizeTracer(Configuration conf) {
            long mergeReaderMem = 100L;
            long mergeMapMaxMem = conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY);
            this.maxBufferSize = (conf.getDouble(FlinkOptions.WRITE_TASK_MAX_SIZE) - (double)mergeReaderMem - (double)mergeMapMaxMem) * 1024.0 * 1024.0;
            String errMsg = String.format("'%s' should be at least greater than '%s' plus merge reader memory(constant 100MB now)", FlinkOptions.WRITE_TASK_MAX_SIZE.key(), FlinkOptions.WRITE_MERGE_MAX_MEMORY.key());
            ValidationUtils.checkState((this.maxBufferSize > 0.0 ? 1 : 0) != 0, (String)errMsg);
        }

        boolean trace(long recordSize) {
            this.bufferSize += recordSize;
            return (double)this.bufferSize > this.maxBufferSize;
        }

        void countDown(long size) {
            this.bufferSize -= size;
        }

        public void reset() {
            this.bufferSize = 0L;
        }
    }

    private static class BufferSizeDetector {
        private final Random random = new Random(47L);
        private static final int DENOMINATOR = 100;
        private final double batchSizeBytes;
        private long lastRecordSize = -1L;
        private long totalSize = 0L;

        BufferSizeDetector(double batchSizeMb) {
            this.batchSizeBytes = batchSizeMb * 1024.0 * 1024.0;
        }

        boolean detect(Object record) {
            if (this.lastRecordSize == -1L || this.sampling()) {
                this.lastRecordSize = ObjectSizeCalculator.getObjectSize((Object)record);
            }
            this.totalSize += this.lastRecordSize;
            return (double)this.totalSize > this.batchSizeBytes;
        }

        boolean sampling() {
            return this.random.nextInt(100) == 1;
        }

        void reset() {
            this.lastRecordSize = -1L;
            this.totalSize = 0L;
        }
    }

    private static class DataBucket {
        private final List<DataItem> records = new ArrayList<DataItem>();
        private final BufferSizeDetector detector;
        private final String partitionPath;
        private final String fileID;

        private DataBucket(Double batchSize, HoodieRecord<?> hoodieRecord) {
            this.detector = new BufferSizeDetector(batchSize);
            this.partitionPath = hoodieRecord.getPartitionPath();
            this.fileID = hoodieRecord.getCurrentLocation().getFileId();
        }

        public List<HoodieRecord> writeBuffer() {
            return this.records.stream().map(record -> record.toHoodieRecord(this.partitionPath)).collect(Collectors.toList());
        }

        public void preWrite(List<HoodieRecord> records) {
            HoodieRecord first = records.get(0);
            HoodieAvroRecord record = new HoodieAvroRecord(first.getKey(), (HoodieRecordPayload)first.getData(), first.getOperation());
            HoodieRecordLocation newLoc = new HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), this.fileID);
            record.setCurrentLocation(newLoc);
            records.set(0, (HoodieRecord)record);
        }

        public void reset() {
            this.records.clear();
            this.detector.reset();
        }
    }

    private static class DataItem {
        private final String key;
        private final String instant;
        private final HoodieRecordPayload<?> data;
        private final HoodieOperation operation;

        private DataItem(String key, String instant, HoodieRecordPayload<?> data, HoodieOperation operation) {
            this.key = key;
            this.instant = instant;
            this.data = data;
            this.operation = operation;
        }

        public static DataItem fromHoodieRecord(HoodieRecord<?> record) {
            return new DataItem(record.getRecordKey(), record.getCurrentLocation().getInstantTime(), ((HoodieAvroRecord)record).getData(), record.getOperation());
        }

        public HoodieRecord<?> toHoodieRecord(String partitionPath) {
            HoodieKey hoodieKey = new HoodieKey(this.key, partitionPath);
            HoodieAvroRecord record = new HoodieAvroRecord(hoodieKey, this.data, this.operation);
            HoodieRecordLocation loc = new HoodieRecordLocation(this.instant, null);
            record.setCurrentLocation(loc);
            return record;
        }
    }
}

