/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.common;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.meta.CkpMetadataFactory;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractStreamWriteFunction<I>
extends AbstractWriteFunction<I>
implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamWriteFunction.class);
    protected final Configuration config;
    protected int taskID;
    protected transient HoodieTableMetaClient metaClient;
    protected transient HoodieFlinkWriteClient writeClient;
    protected volatile String currentInstant;
    protected transient OperatorEventGateway eventGateway;
    protected volatile boolean confirming = false;
    private transient ListState<WriteMetadataEvent> writeMetadataState;
    protected List<WriteStatus> writeStatuses;
    private transient CkpMetadata ckpMetadata;
    private transient boolean inputEnded;

    public AbstractStreamWriteFunction(Configuration config) {
        this.config = config;
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        this.taskID = this.getRuntimeContext().getIndexOfThisSubtask();
        this.metaClient = StreamerUtil.createMetaClient(this.config);
        this.writeClient = FlinkWriteClients.createWriteClient(this.config, this.getRuntimeContext());
        this.writeStatuses = new ArrayList<WriteStatus>();
        this.writeMetadataState = context.getOperatorStateStore().getListState(new ListStateDescriptor("write-metadata-state", TypeInformation.of(WriteMetadataEvent.class)));
        this.ckpMetadata = CkpMetadataFactory.getCkpMetadata(this.writeClient.getConfig(), this.config);
        this.currentInstant = this.lastPendingInstant();
        if (context.isRestored()) {
            this.restoreWriteMetadata();
        } else {
            this.sendBootstrapEvent();
        }
        this.confirming = true;
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (this.inputEnded) {
            return;
        }
        this.snapshotState();
        this.reloadWriteMetaState();
    }

    public abstract void snapshotState();

    @Override
    public void endInput() {
        this.inputEnded = true;
    }

    @VisibleForTesting
    public boolean isConfirming() {
        return this.confirming;
    }

    @Override
    public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
        this.eventGateway = operatorEventGateway;
    }

    private void restoreWriteMetadata() throws Exception {
        boolean eventSent = false;
        HoodieTimeline pendingTimeline = this.metaClient.getActiveTimeline().filterPendingExcludingCompaction();
        for (WriteMetadataEvent event : (Iterable)this.writeMetadataState.get()) {
            if (!pendingTimeline.containsInstant(event.getInstantTime())) continue;
            event.setTaskID(this.taskID);
            this.eventGateway.sendEventToCoordinator((OperatorEvent)event);
            LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", (Object)this.taskID);
            eventSent = true;
        }
        if (!eventSent) {
            this.sendBootstrapEvent();
        }
    }

    protected void sendBootstrapEvent() {
        this.eventGateway.sendEventToCoordinator((OperatorEvent)WriteMetadataEvent.emptyBootstrap(this.taskID));
        LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", (Object)this.taskID);
    }

    private void reloadWriteMetaState() throws Exception {
        this.writeMetadataState.clear();
        WriteMetadataEvent event = WriteMetadataEvent.builder().taskID(this.taskID).instantTime(this.currentInstant).writeStatus(new ArrayList<WriteStatus>(this.writeStatuses)).bootstrap(true).build();
        this.writeMetadataState.add((Object)event);
        this.writeStatuses.clear();
    }

    @Override
    public void handleOperatorEvent(OperatorEvent event) {
        ValidationUtils.checkArgument((boolean)(event instanceof CommitAckEvent), (String)"The write function can only handle CommitAckEvent");
        this.confirming = false;
    }

    protected String lastPendingInstant() {
        return this.ckpMetadata.lastPendingInstant();
    }

    protected String instantToWrite(boolean hasData) {
        String instant = this.lastPendingInstant();
        TimeWait timeWait = TimeWait.builder().timeout(this.config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)).action("instant initialize").build();
        while (this.confirming) {
            if (instant == null || this.invalidInstant(instant, hasData)) {
                timeWait.waitFor();
                instant = this.lastPendingInstant();
                continue;
            }
            this.confirming = false;
        }
        return instant;
    }

    private boolean invalidInstant(String instant, boolean hasData) {
        return instant.equals(this.currentInstant) && hasData;
    }
}

