package org.apache.hudi.sink;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.hudi.adapter.OperatorCoordinatorAdapter;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.meta.CkpMetadataFactory;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.ClientIds;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/StreamWriteOperatorCoordinator.class */
public class StreamWriteOperatorCoordinator implements OperatorCoordinatorAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(StreamWriteOperatorCoordinator.class);
    private final Configuration conf;
    private final SerializableConfiguration hiveConf;
    private final OperatorCoordinator.Context context;
    private transient OperatorCoordinator.SubtaskGateway[] gateways;
    private transient HoodieFlinkWriteClient writeClient;
    private transient HoodieTableMetaClient metaClient;
    private volatile String instant = "";
    private transient WriteMetadataEvent[] eventBuffer;
    private final int parallelism;
    private NonThrownExecutor executor;
    private NonThrownExecutor hiveSyncExecutor;
    private HiveSyncContext hiveSyncContext;
    private transient TableState tableState;
    private CkpMetadata ckpMetadata;
    private ClientIds clientIds;

    /* loaded from: input_file:org/apache/hudi/sink/StreamWriteOperatorCoordinator$Provider.class */
    public static class Provider implements OperatorCoordinator.Provider {
        private final OperatorID operatorId;
        private final Configuration conf;

        public Provider(OperatorID operatorID, Configuration configuration) {
            this.operatorId = operatorID;
            this.conf = configuration;
        }

        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        public OperatorCoordinator create(OperatorCoordinator.Context context) {
            return new StreamWriteOperatorCoordinator(this.conf, context);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/sink/StreamWriteOperatorCoordinator$TableState.class */
    public static class TableState implements Serializable {
        private static final long serialVersionUID = 1;
        final WriteOperationType operationType;
        final String commitAction;
        final boolean isOverwrite;
        final boolean scheduleCompaction;
        final boolean scheduleClustering;
        final boolean syncHive;
        final boolean syncMetadata;
        final boolean isDeltaTimeCompaction;

        private TableState(Configuration configuration) {
            this.operationType = WriteOperationType.fromValue(configuration.getString(FlinkOptions.OPERATION));
            this.commitAction = CommitUtils.getCommitActionType(this.operationType, HoodieTableType.valueOf(configuration.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT)));
            this.isOverwrite = WriteOperationType.isOverwrite(this.operationType);
            this.scheduleCompaction = OptionsResolver.needsScheduleCompaction(configuration);
            this.scheduleClustering = OptionsResolver.needsScheduleClustering(configuration);
            this.syncHive = configuration.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED);
            this.syncMetadata = configuration.getBoolean(FlinkOptions.METADATA_ENABLED);
            this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(configuration);
        }

        public static TableState create(Configuration configuration) {
            return new TableState(configuration);
        }
    }

    public StreamWriteOperatorCoordinator(Configuration configuration, OperatorCoordinator.Context context) {
        this.conf = configuration;
        this.context = context;
        this.parallelism = context.currentParallelism();
        this.hiveConf = new SerializableConfiguration(HadoopConfigurations.getHiveConf(configuration));
    }

    public void start() throws Exception {
        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
        reset();
        this.gateways = new OperatorCoordinator.SubtaskGateway[this.parallelism];
        this.metaClient = StreamerUtil.initTableIfNotExists(this.conf);
        this.writeClient = FlinkWriteClients.createWriteClient(this.conf);
        this.ckpMetadata = initCkpMetadata(this.writeClient.getConfig(), this.conf);
        initMetadataTable(this.writeClient);
        this.tableState = TableState.create(this.conf);
        this.executor = NonThrownExecutor.builder(LOG).exceptionHook((str, th) -> {
            this.context.failJob(new HoodieException(str, th));
        }).waitForTasksFinish(true).build();
        if (this.tableState.syncHive) {
            initHiveSync();
        }
        if (OptionsResolver.isMultiWriter(this.conf)) {
            initClientIds(this.conf);
        }
    }

    public void close() throws Exception {
        if (this.executor != null) {
            this.executor.close();
        }
        if (this.hiveSyncExecutor != null) {
            this.hiveSyncExecutor.close();
        }
        if (this.writeClient != null) {
            this.writeClient.close();
        }
        this.eventBuffer = null;
        if (this.ckpMetadata != null) {
            this.ckpMetadata.close();
        }
        if (this.clientIds != null) {
            this.clientIds.close();
        }
    }

    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) {
        this.executor.execute(() -> {
            try {
                completableFuture.complete(new byte[0]);
            } catch (Throwable th) {
                completableFuture.completeExceptionally(new CompletionException(String.format("Failed to checkpoint Instant %s for source %s", this.instant, getClass().getSimpleName()), th));
            }
        }, "taking checkpoint %d", Long.valueOf(j));
    }

    public void notifyCheckpointComplete(long j) {
        this.executor.execute(() -> {
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
            boolean commitInstant = commitInstant(this.instant, j);
            scheduleTableServices(Boolean.valueOf(commitInstant));
            if (commitInstant) {
                startInstant();
                syncHiveAsync();
            }
        }, "commits the instant %s", this.instant);
    }

    public void resetToCheckpoint(long j, byte[] bArr) {
    }

    public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
        ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent, "The coordinator can only handle WriteMetaEvent");
        WriteMetadataEvent writeMetadataEvent = (WriteMetadataEvent) operatorEvent;
        if (writeMetadataEvent.isEndInput()) {
            this.executor.executeSync(() -> {
                handleEndInputEvent(writeMetadataEvent);
            }, "handle end input event for instant %s", this.instant);
        } else {
            this.executor.execute(() -> {
                if (writeMetadataEvent.isBootstrap()) {
                    handleBootstrapEvent(writeMetadataEvent);
                } else {
                    handleWriteMetaEvent(writeMetadataEvent);
                }
            }, "handle write metadata event for instant %s", this.instant);
        }
    }

    public void subtaskFailed(int i, @Nullable Throwable th) {
        this.eventBuffer[i] = null;
        LOG.warn("Reset the event for task [" + i + "]", th);
    }

    public void subtaskReset(int i, long j) {
    }

    public void subtaskReady(int i, OperatorCoordinator.SubtaskGateway subtaskGateway) {
        this.gateways[i] = subtaskGateway;
    }

    private void initHiveSync() {
        this.hiveSyncExecutor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
        this.hiveSyncContext = HiveSyncContext.create(this.conf, this.hiveConf);
    }

    private void syncHiveAsync() {
        if (this.tableState.syncHive) {
            this.hiveSyncExecutor.execute(this::doSyncHive, "sync hive metadata for instant %s", this.instant);
        }
    }

    private void syncHive() {
        if (this.tableState.syncHive) {
            doSyncHive();
            LOG.info("Sync hive metadata for instant {} success!", this.instant);
        }
    }

    public void doSyncHive() {
        HiveSyncTool hiveSyncTool = this.hiveSyncContext.hiveSyncTool();
        Throwable th = null;
        try {
            hiveSyncTool.syncHoodieTable();
            if (hiveSyncTool != null) {
                if (0 == 0) {
                    hiveSyncTool.close();
                    return;
                }
                try {
                    hiveSyncTool.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (hiveSyncTool != null) {
                if (0 != 0) {
                    try {
                        hiveSyncTool.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    hiveSyncTool.close();
                }
            }
            throw th3;
        }
    }

    private static void initMetadataTable(HoodieFlinkWriteClient<?> hoodieFlinkWriteClient) {
        hoodieFlinkWriteClient.initMetadataTable();
    }

    private static CkpMetadata initCkpMetadata(HoodieWriteConfig hoodieWriteConfig, Configuration configuration) throws IOException {
        CkpMetadata ckpMetadata = CkpMetadataFactory.getCkpMetadata(hoodieWriteConfig, configuration);
        ckpMetadata.bootstrap();
        return ckpMetadata;
    }

    private void initClientIds(Configuration configuration) {
        this.clientIds = ClientIds.builder().conf(configuration).build();
        this.clientIds.start();
    }

    private void reset() {
        this.eventBuffer = new WriteMetadataEvent[this.parallelism];
    }

    private boolean allEventsReceived() {
        return Arrays.stream(this.eventBuffer).allMatch(writeMetadataEvent -> {
            return writeMetadataEvent != null && writeMetadataEvent.isLastBatch();
        });
    }

    private void addEventToBuffer(WriteMetadataEvent writeMetadataEvent) {
        if (this.eventBuffer[writeMetadataEvent.getTaskID()] != null) {
            this.eventBuffer[writeMetadataEvent.getTaskID()].mergeWith(writeMetadataEvent);
        } else {
            this.eventBuffer[writeMetadataEvent.getTaskID()] = writeMetadataEvent;
        }
    }

    private void startInstant() {
        this.writeClient.preTxn(this.tableState.operationType, this.metaClient);
        this.instant = this.writeClient.startCommit(this.tableState.commitAction, this.metaClient);
        this.metaClient.getActiveTimeline().transitionRequestedToInflight(this.tableState.commitAction, this.instant);
        this.writeClient.setWriteTimer(this.tableState.commitAction);
        this.ckpMetadata.startInstant(this.instant);
        LOG.info("Create instant [{}] for table [{}] with type [{}]", new Object[]{this.instant, this.conf.getString(FlinkOptions.TABLE_NAME), this.conf.getString(FlinkOptions.TABLE_TYPE)});
    }

    private void initInstant(String str) {
        HoodieTimeline filterCompletedInstants = this.metaClient.getActiveTimeline().filterCompletedInstants();
        if (str.equals("") || filterCompletedInstants.containsInstant(str)) {
            reset();
        } else {
            LOG.info("Recommit instant {}", str);
            if (this.writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
                this.writeClient.getHeartbeatClient().start(str);
            }
            commitInstant(str);
        }
        startInstant();
        this.writeClient.upgradeDowngrade(this.instant, this.metaClient);
    }

    private void handleBootstrapEvent(WriteMetadataEvent writeMetadataEvent) {
        this.eventBuffer[writeMetadataEvent.getTaskID()] = writeMetadataEvent;
        if (Arrays.stream(this.eventBuffer).allMatch(writeMetadataEvent2 -> {
            return writeMetadataEvent2 != null && writeMetadataEvent2.isBootstrap();
        })) {
            initInstant((String) Arrays.stream(this.eventBuffer).filter(writeMetadataEvent3 -> {
                return writeMetadataEvent3.getWriteStatuses().size() > 0;
            }).findFirst().map((v0) -> {
                return v0.getInstantTime();
            }).orElse(""));
        }
    }

    private void handleEndInputEvent(WriteMetadataEvent writeMetadataEvent) {
        addEventToBuffer(writeMetadataEvent);
        if (allEventsReceived() && commitInstant(this.instant)) {
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
            syncHive();
            scheduleTableServices(true);
        }
    }

    private void scheduleTableServices(Boolean bool) {
        if (this.tableState.scheduleCompaction) {
            CompactionUtil.scheduleCompaction(this.writeClient, this.tableState.isDeltaTimeCompaction, bool.booleanValue());
        }
        if (this.tableState.scheduleClustering) {
            ClusteringUtil.scheduleClustering(this.conf, this.writeClient, bool.booleanValue());
        }
    }

    private void handleWriteMetaEvent(WriteMetadataEvent writeMetadataEvent) {
        ValidationUtils.checkState(HoodieTimeline.compareTimestamps(this.instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, writeMetadataEvent.getInstantTime()), String.format("Receive an unexpected event for instant %s from task %d", writeMetadataEvent.getInstantTime(), Integer.valueOf(writeMetadataEvent.getTaskID())));
        addEventToBuffer(writeMetadataEvent);
    }

    private void sendCommitAckEvents(long j) {
        CompletableFuture.allOf((CompletableFuture[]) Arrays.stream(this.gateways).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map(subtaskGateway -> {
            return subtaskGateway.sendEvent(CommitAckEvent.getInstance(j));
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).whenComplete((r5, th) -> {
            if (!sendToFinishedTasks(th)) {
                throw new HoodieException("Error while waiting for the commit ack events to finish sending", th);
            }
        });
    }

    private static boolean sendToFinishedTasks(Throwable th) {
        return (th.getCause() instanceof TaskNotRunningException) || th.getCause().getMessage().contains("running");
    }

    private boolean commitInstant(String str) {
        return commitInstant(str, -1L);
    }

    private boolean commitInstant(String str, long j) {
        if (Arrays.stream(this.eventBuffer).allMatch((v0) -> {
            return Objects.isNull(v0);
        })) {
            return false;
        }
        List<WriteStatus> list = (List) Arrays.stream(this.eventBuffer).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map((v0) -> {
            return v0.getWriteStatuses();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        if (list.size() != 0 || OptionsResolver.allowCommitOnEmptyBatch(this.conf)) {
            doCommit(str, list);
            return true;
        }
        reset();
        if (j == -1) {
            return false;
        }
        sendCommitAckEvents(j);
        return false;
    }

    private void doCommit(String str, List<WriteStatus> list) {
        long longValue = ((Long) list.stream().map((v0) -> {
            return v0.getTotalErrorRecords();
        }).reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        }).orElse(0L)).longValue();
        long longValue2 = ((Long) list.stream().map((v0) -> {
            return v0.getTotalRecords();
        }).reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        }).orElse(0L)).longValue();
        boolean z = longValue > 0;
        if (z && !this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
            LOG.error("Error when writing. Errors/Total=" + longValue + "/" + longValue2);
            LOG.error("The first 100 error messages");
            list.stream().filter((v0) -> {
                return v0.hasErrors();
            }).limit(100L).forEach(writeStatus -> {
                LOG.error("Global error for partition path {} and fileID {}: {}", new Object[]{writeStatus.getGlobalError(), writeStatus.getPartitionPath(), writeStatus.getFileId()});
                if (writeStatus.getErrors().size() > 0) {
                    writeStatus.getErrors().forEach((hoodieKey, th) -> {
                        LOG.trace("Error for key:" + hoodieKey + " and value " + th);
                    });
                }
            });
            this.writeClient.rollback(str);
            throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", str));
        }
        HashMap hashMap = new HashMap();
        if (z) {
            LOG.warn("Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total=" + longValue + "/" + longValue2);
        }
        if (!this.writeClient.commit(str, list, Option.of(hashMap), this.tableState.commitAction, this.tableState.isOverwrite ? this.writeClient.getPartitionToReplacedFileIds(this.tableState.operationType, list) : Collections.emptyMap())) {
            throw new HoodieException(String.format("Commit instant [%s] failed!", str));
        }
        reset();
        this.ckpMetadata.commitInstant(str);
        LOG.info("Commit instant [{}] success!", str);
    }

    @VisibleForTesting
    public WriteMetadataEvent[] getEventBuffer() {
        return this.eventBuffer;
    }

    @VisibleForTesting
    public String getInstant() {
        return this.instant;
    }

    @VisibleForTesting
    public OperatorCoordinator.Context getContext() {
        return this.context;
    }

    @VisibleForTesting
    public HoodieFlinkWriteClient getWriteClient() {
        return this.writeClient;
    }

    @VisibleForTesting
    public void setExecutor(NonThrownExecutor nonThrownExecutor) throws Exception {
        if (this.executor != null) {
            this.executor.close();
        }
        this.executor = nonThrownExecutor;
    }
}
