package org.apache.hudi.sink.common;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/common/AbstractStreamWriteFunction.class */
public abstract class AbstractStreamWriteFunction<I> extends AbstractWriteFunction<I> implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamWriteFunction.class);
    protected final Configuration config;
    protected int taskID;
    protected transient HoodieTableMetaClient metaClient;
    protected transient HoodieFlinkWriteClient writeClient;
    protected volatile String currentInstant;
    protected transient OperatorEventGateway eventGateway;
    protected volatile boolean confirming = false;
    private transient ListState<WriteMetadataEvent> writeMetadataState;
    protected List<WriteStatus> writeStatuses;

    public AbstractStreamWriteFunction(Configuration configuration) {
        this.config = configuration;
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.taskID = getRuntimeContext().getIndexOfThisSubtask();
        this.metaClient = StreamerUtil.createMetaClient(this.config);
        this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
        this.writeStatuses = new ArrayList();
        this.writeMetadataState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("write-metadata-state", TypeInformation.of(WriteMetadataEvent.class)));
        this.currentInstant = lastPendingInstant();
        if (functionInitializationContext.isRestored()) {
            restoreWriteMetadata();
        } else {
            sendBootstrapEvent();
        }
        this.confirming = true;
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        snapshotState();
        reloadWriteMetaState();
    }

    public abstract void snapshotState();

    @VisibleForTesting
    public boolean isConfirming() {
        return this.confirming;
    }

    @Override // org.apache.hudi.sink.common.AbstractWriteFunction
    public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
        this.eventGateway = operatorEventGateway;
    }

    private void restoreWriteMetadata() throws Exception {
        String lastPendingInstant = lastPendingInstant();
        boolean z = false;
        for (WriteMetadataEvent writeMetadataEvent : this.writeMetadataState.get()) {
            if (Objects.equals(lastPendingInstant, writeMetadataEvent.getInstantTime())) {
                this.eventGateway.sendEventToCoordinator(writeMetadataEvent);
                LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", Integer.valueOf(this.taskID));
                z = true;
            }
        }
        if (z) {
            return;
        }
        sendBootstrapEvent();
    }

    private void sendBootstrapEvent() {
        this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(this.taskID));
        LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", Integer.valueOf(this.taskID));
    }

    private void reloadWriteMetaState() throws Exception {
        this.writeMetadataState.clear();
        this.writeMetadataState.add(WriteMetadataEvent.builder().taskID(this.taskID).instantTime(this.currentInstant).writeStatus(new ArrayList(this.writeStatuses)).bootstrap(true).build());
        this.writeStatuses.clear();
    }

    @Override // org.apache.hudi.sink.common.AbstractWriteFunction
    public void handleOperatorEvent(OperatorEvent operatorEvent) {
        ValidationUtils.checkArgument(operatorEvent instanceof CommitAckEvent, "The write function can only handle CommitAckEvent");
        this.confirming = false;
    }

    protected String lastPendingInstant() {
        return StreamerUtil.getLastPendingInstant(this.metaClient);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String instantToWrite(boolean z) {
        String lastPendingInstant = lastPendingInstant();
        TimeWait build = TimeWait.builder().timeout(this.config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)).action("instant initialize").build();
        while (this.confirming) {
            if (lastPendingInstant == null || (lastPendingInstant.equals(this.currentInstant) && z)) {
                build.waitFor();
                lastPendingInstant = lastPendingInstant();
            } else {
                this.confirming = false;
            }
        }
        return lastPendingInstant;
    }
}
