package org.apache.hudi.sink.partitioner;

import java.util.Objects;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.BaseAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.StreamerUtil;

/* loaded from: input_file:org/apache/hudi/sink/partitioner/BucketAssignFunction.class */
public class BucketAssignFunction<K, I, O extends HoodieRecord<?>> extends KeyedProcessFunction<K, I, O> implements CheckpointedFunction, CheckpointListener {
    private ValueState<HoodieRecordGlobalLocation> indexState;
    private BucketAssigner bucketAssigner;
    private final Configuration conf;
    private final boolean isChangingRecords;
    private PayloadCreation payloadCreation;
    private final boolean globalIndex;

    public BucketAssignFunction(Configuration configuration) {
        this.conf = configuration;
        this.isChangingRecords = WriteOperationType.isChangingRecords(WriteOperationType.fromValue(configuration.getString(FlinkOptions.OPERATION)));
        this.globalIndex = configuration.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED) && !configuration.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        HoodieWriteConfig hoodieClientConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
        this.bucketAssigner = BucketAssigners.create(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getMaxNumberOfParallelSubtasks(), getRuntimeContext().getNumberOfParallelSubtasks(), ignoreSmallFiles(), HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)), new HoodieFlinkEngineContext(new SerializableConfiguration(StreamerUtil.getHadoopConf()), new FlinkTaskContextSupplier(getRuntimeContext())), hoodieClientConfig);
        this.payloadCreation = PayloadCreation.instance(this.conf);
    }

    private boolean ignoreSmallFiles() {
        return WriteOperationType.isOverwrite(WriteOperationType.fromValue(this.conf.getString(FlinkOptions.OPERATION)));
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
        this.bucketAssigner.reset();
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) {
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("indexState", TypeInformation.of(HoodieRecordGlobalLocation.class));
        double d = this.conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24.0d * 60.0d * 60.0d * 1000.0d;
        if (d > 0.0d) {
            valueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.milliseconds((long) d)).build());
        }
        this.indexState = functionInitializationContext.getKeyedStateStore().getState(valueStateDescriptor);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(I i, KeyedProcessFunction<K, I, O>.Context context, Collector<O> collector) throws Exception {
        if (i instanceof IndexRecord) {
            this.indexState.update((HoodieRecordGlobalLocation) ((IndexRecord) i).getCurrentLocation());
        } else {
            processRecord((HoodieRecord) i, collector);
        }
    }

    private void processRecord(HoodieRecord<?> hoodieRecord, Collector<O> collector) throws Exception {
        HoodieRecordLocation newRecordLocation;
        HoodieKey key = hoodieRecord.getKey();
        String recordKey = key.getRecordKey();
        String partitionPath = key.getPartitionPath();
        HoodieRecordGlobalLocation value = this.indexState.value();
        if (!this.isChangingRecords || value == null) {
            newRecordLocation = getNewRecordLocation(partitionPath);
        } else if (Objects.equals(value.getPartitionPath(), partitionPath)) {
            newRecordLocation = value.toLocal("U");
            this.bucketAssigner.addUpdate(partitionPath, newRecordLocation.getFileId());
        } else {
            if (this.globalIndex) {
                HoodieAvroRecord hoodieAvroRecord = new HoodieAvroRecord(new HoodieKey(recordKey, value.getPartitionPath()), this.payloadCreation.createDeletePayload((BaseAvroPayload) hoodieRecord.getData()));
                hoodieAvroRecord.setCurrentLocation(value.toLocal("U"));
                hoodieAvroRecord.seal();
                collector.collect(hoodieAvroRecord);
            }
            newRecordLocation = getNewRecordLocation(partitionPath);
        }
        if (this.isChangingRecords) {
            updateIndexState(partitionPath, newRecordLocation);
        }
        hoodieRecord.setCurrentLocation(newRecordLocation);
        collector.collect(hoodieRecord);
    }

    private HoodieRecordLocation getNewRecordLocation(String str) {
        HoodieRecordLocation hoodieRecordLocation;
        BucketInfo addInsert = this.bucketAssigner.addInsert(str);
        switch (addInsert.getBucketType()) {
            case INSERT:
                hoodieRecordLocation = new HoodieRecordLocation("I", addInsert.getFileIdPrefix());
                break;
            case UPDATE:
                hoodieRecordLocation = new HoodieRecordLocation("U", addInsert.getFileIdPrefix());
                break;
            default:
                throw new AssertionError();
        }
        return hoodieRecordLocation;
    }

    private void updateIndexState(String str, HoodieRecordLocation hoodieRecordLocation) throws Exception {
        this.indexState.update(HoodieRecordGlobalLocation.fromLocal(str, hoodieRecordLocation));
    }

    @Override // org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointComplete(long j) {
        this.bucketAssigner.reload(j);
    }

    public void close() throws Exception {
        this.bucketAssigner.close();
    }
}
