package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.FlinkLazyInsertIterable;
import org.apache.hudi.io.ExplicitWriteHandleFactory;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.class */
public abstract class BaseFlinkCommitActionExecutor<T> extends BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, HoodieWriteMetadata> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseFlinkCommitActionExecutor.class);
    protected HoodieWriteHandle<?, ?, ?, ?> writeHandle;

    public BaseFlinkCommitActionExecutor(HoodieEngineContext hoodieEngineContext, HoodieWriteHandle<?, ?, ?, ?> hoodieWriteHandle, HoodieWriteConfig hoodieWriteConfig, HoodieTable hoodieTable, String str, WriteOperationType writeOperationType) {
        this(hoodieEngineContext, hoodieWriteHandle, hoodieWriteConfig, hoodieTable, str, writeOperationType, Option.empty());
    }

    public BaseFlinkCommitActionExecutor(HoodieEngineContext hoodieEngineContext, HoodieWriteHandle<?, ?, ?, ?> hoodieWriteHandle, HoodieWriteConfig hoodieWriteConfig, HoodieTable hoodieTable, String str, WriteOperationType writeOperationType, Option option) {
        super(hoodieEngineContext, hoodieWriteConfig, hoodieTable, str, writeOperationType, option);
        this.writeHandle = hoodieWriteHandle;
    }

    @Override // org.apache.hudi.table.action.commit.BaseCommitActionExecutor
    public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> list) {
        HoodieWriteMetadata<List<WriteStatus>> hoodieWriteMetadata = new HoodieWriteMetadata<>();
        LinkedList linkedList = new LinkedList();
        HoodieRecord<T> hoodieRecord = list.get(0);
        Iterator<List<WriteStatus>> handleUpsertPartition = handleUpsertPartition(hoodieRecord.getPartitionPath(), hoodieRecord.getCurrentLocation().getFileId(), hoodieRecord.getCurrentLocation().getInstantTime().equals("I") ? BucketType.INSERT : BucketType.UPDATE, list.iterator());
        linkedList.getClass();
        handleUpsertPartition.forEachRemaining((v1) -> {
            r1.addAll(v1);
        });
        setUpWriteMetadata(linkedList, hoodieWriteMetadata);
        return hoodieWriteMetadata;
    }

    protected void setUpWriteMetadata(List<WriteStatus> list, HoodieWriteMetadata<List<WriteStatus>> hoodieWriteMetadata) {
        hoodieWriteMetadata.setWriteStatuses(list);
        hoodieWriteMetadata.setIndexUpdateDuration(Duration.ZERO);
    }

    @Override // org.apache.hudi.table.action.commit.BaseCommitActionExecutor
    protected String getCommitActionType() {
        return this.table.getMetaClient().getCommitActionType();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.hudi.table.action.commit.BaseCommitActionExecutor
    protected void commit(HoodieWriteMetadata<List<WriteStatus>> hoodieWriteMetadata) {
        commit(hoodieWriteMetadata, (List) ((List) hoodieWriteMetadata.getWriteStatuses()).stream().map((v0) -> {
            return v0.getStat();
        }).collect(Collectors.toList()));
    }

    @Override // org.apache.hudi.table.action.commit.BaseCommitActionExecutor
    protected void setCommitMetadata(HoodieWriteMetadata<List<WriteStatus>> hoodieWriteMetadata) {
        hoodieWriteMetadata.setCommitMetadata(Option.of(CommitUtils.buildMetadata((List) hoodieWriteMetadata.getWriteStatuses().stream().map((v0) -> {
            return v0.getStat();
        }).collect(Collectors.toList()), hoodieWriteMetadata.getPartitionToReplaceFileIds(), this.extraMetadata, this.operationType, getSchemaToStoreInCommit(), getCommitActionType())));
    }

    protected Iterator<List<WriteStatus>> handleUpsertPartition(String str, String str2, BucketType bucketType, Iterator it) {
        try {
            if (this.writeHandle instanceof HoodieCreateHandle) {
                return handleInsert(str2, it);
            }
            if (this.writeHandle instanceof HoodieMergeHandle) {
                return handleUpdate(str, str2, it);
            }
            switch (bucketType) {
                case INSERT:
                    return handleInsert(str2, it);
                case UPDATE:
                    return handleUpdate(str, str2, it);
                default:
                    throw new AssertionError();
            }
        } catch (Throwable th) {
            String str3 = "Error upserting bucketType " + bucketType + " for partition :" + str;
            LOG.error(str3, th);
            throw new HoodieUpsertException(str3, th);
        }
    }

    @Override // org.apache.hudi.table.action.commit.BaseCommitActionExecutor
    public Iterator<List<WriteStatus>> handleUpdate(String str, String str2, Iterator<HoodieRecord<T>> it) throws IOException {
        if (it.hasNext()) {
            return handleUpdateInternal((HoodieMergeHandle) this.writeHandle, str2);
        }
        LOG.info("Empty partition with fileId => " + str2);
        return Collections.singletonList(Collections.EMPTY_LIST).iterator();
    }

    protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> hoodieMergeHandle, String str) throws IOException {
        this.table.runMerge(hoodieMergeHandle, this.instantTime, str);
        return hoodieMergeHandle.getWriteStatusesAsIterator();
    }

    @Override // org.apache.hudi.table.action.commit.BaseCommitActionExecutor
    public Iterator<List<WriteStatus>> handleInsert(String str, Iterator<HoodieRecord<T>> it) throws Exception {
        if (it.hasNext()) {
            return new FlinkLazyInsertIterable(it, true, this.config, this.instantTime, this.table, str, this.taskContextSupplier, new ExplicitWriteHandleFactory(this.writeHandle));
        }
        LOG.info("Empty partition");
        return Collections.singletonList(Collections.EMPTY_LIST).iterator();
    }
}
