package org.apache.hudi.sink.bulk;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/bulk/BulkInsertWriteFunction.class */
public class BulkInsertWriteFunction<I, O> extends ProcessFunction<I, O> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(BulkInsertWriteFunction.class);
    private transient BulkInsertWriterHelper writerHelper;
    private final Configuration config;
    private final RowType rowType;
    private int taskID;
    private transient HoodieFlinkWriteClient writeClient;
    private volatile String initInstant;
    private transient OperatorEventGateway eventGateway;
    private transient String actionType;

    public BulkInsertWriteFunction(Configuration configuration, RowType rowType) {
        this.config = configuration;
        this.rowType = rowType;
    }

    public void open(Configuration configuration) throws IOException {
        this.taskID = getRuntimeContext().getIndexOfThisSubtask();
        this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
        this.actionType = CommitUtils.getCommitActionType(WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)), HoodieTableType.valueOf(this.config.getString(FlinkOptions.TABLE_TYPE)));
        this.initInstant = this.writeClient.getLastPendingInstant(this.actionType);
        sendBootstrapEvent();
        initWriterHelper();
    }

    public void processElement(I i, ProcessFunction<I, O>.Context context, Collector<O> collector) throws IOException {
        this.writerHelper.write((RowData) i);
    }

    public void close() {
        if (this.writeClient != null) {
            this.writeClient.cleanHandlesGracefully();
            this.writeClient.close();
        }
    }

    public void endInput() {
        try {
            this.writerHelper.close();
            this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.builder().taskID(this.taskID).instantTime(this.writerHelper.getInstantTime()).writeStatus((List) this.writerHelper.getWriteStatuses().stream().map(BulkInsertWriteFunction::toWriteStatus).collect(Collectors.toList())).lastBatch(true).endInput(true).build());
        } catch (IOException e) {
            throw new HoodieException("Error collect the write status for task [" + this.taskID + "]");
        }
    }

    private static WriteStatus toWriteStatus(HoodieInternalWriteStatus hoodieInternalWriteStatus) {
        WriteStatus writeStatus = new WriteStatus(false, Double.valueOf(0.1d));
        writeStatus.setStat(hoodieInternalWriteStatus.getStat());
        writeStatus.setFileId(hoodieInternalWriteStatus.getFileId());
        writeStatus.setGlobalError(hoodieInternalWriteStatus.getGlobalError());
        writeStatus.setTotalRecords(hoodieInternalWriteStatus.getTotalRecords());
        writeStatus.setTotalErrorRecords(hoodieInternalWriteStatus.getTotalErrorRecords());
        return writeStatus;
    }

    public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
        this.eventGateway = operatorEventGateway;
    }

    private void initWriterHelper() {
        this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(), instantToWrite(), this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(), this.rowType);
    }

    private void sendBootstrapEvent() {
        this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.builder().taskID(this.taskID).writeStatus(Collections.emptyList()).instantTime(WriteMetadataEvent.BOOTSTRAP_INSTANT).bootstrap(true).build());
        LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", Integer.valueOf(this.taskID));
    }

    private String instantToWrite() {
        String lastPendingInstant = this.writeClient.getLastPendingInstant(this.actionType);
        TimeWait build = TimeWait.builder().timeout(this.config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)).action("instant initialize").build();
        while (true) {
            if (lastPendingInstant != null && !lastPendingInstant.equals(this.initInstant)) {
                return lastPendingInstant;
            }
            build.waitFor();
            lastPendingInstant = this.writeClient.getLastPendingInstant(this.actionType);
        }
    }
}
