/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.utils;

import java.util.concurrent.CompletableFuture;
import org.apache.avro.Schema;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.MockStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.append.AppendWriteFunction;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.ClusteringFunctionWrapper;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.sink.utils.MockStateInitializationContext;
import org.apache.hudi.sink.utils.MockStreamingRuntimeContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.sink.utils.TestFunctionWrapper;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;

public class InsertFunctionWrapper<I>
implements TestFunctionWrapper<I> {
    private final Configuration conf;
    private final RowType rowType;
    private final MockStreamingRuntimeContext runtimeContext;
    private final MockOperatorEventGateway gateway;
    private final MockOperatorCoordinatorContext coordinatorContext;
    private StreamWriteOperatorCoordinator coordinator;
    private final MockStateInitializationContext stateInitializationContext;
    private final boolean asyncClustering;
    private ClusteringFunctionWrapper clusteringFunctionWrapper;
    private AppendWriteFunction<RowData> writeFunction;

    public InsertFunctionWrapper(String tablePath, Configuration conf) throws Exception {
        IOManagerAsync ioManager = new IOManagerAsync();
        MockEnvironment environment = new MockEnvironmentBuilder().setTaskName("mockTask").setManagedMemorySize(131072L).setIOManager((IOManager)ioManager).build();
        this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
        this.gateway = new MockOperatorEventGateway();
        this.conf = conf;
        this.rowType = (RowType)AvroSchemaConverter.convertToDataType((Schema)StreamerUtil.getSourceSchema((Configuration)conf)).getLogicalType();
        this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)this.coordinatorContext);
        this.stateInitializationContext = new MockStateInitializationContext();
        this.asyncClustering = OptionsResolver.needsAsyncClustering((Configuration)conf);
        StreamConfig streamConfig = new StreamConfig(conf);
        streamConfig.setOperatorID(new OperatorID());
        MockStreamTask streamTask = new MockStreamTaskBuilder((Environment)environment).setConfig(new StreamConfig(conf)).setExecutionConfig(new ExecutionConfig().enableObjectReuse()).build();
        this.clusteringFunctionWrapper = new ClusteringFunctionWrapper(this.conf, (StreamTask<?, ?>)streamTask, streamConfig);
    }

    @Override
    public void openFunction() throws Exception {
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)this.coordinatorContext));
        this.setupWriteFunction();
        this.coordinator.handleEventFromOperator(0, this.getNextEvent());
        if (this.asyncClustering) {
            this.clusteringFunctionWrapper.openFunction();
        }
    }

    @Override
    public void invoke(I record) throws Exception {
        this.writeFunction.processElement((Object)((RowData)record), null, null);
    }

    @Override
    public WriteMetadataEvent[] getEventBuffer() {
        return this.coordinator.getEventBuffer();
    }

    @Override
    public OperatorEvent getNextEvent() {
        return this.gateway.getNextEvent();
    }

    @Override
    public void checkpointFunction(long checkpointId) throws Exception {
        this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture());
        this.writeFunction.snapshotState((FunctionSnapshotContext)new MockFunctionSnapshotContext(checkpointId));
        this.stateInitializationContext.checkpointBegin(checkpointId);
    }

    @Override
    public void endInput() {
        this.writeFunction.endInput();
    }

    @Override
    public void checkpointComplete(long checkpointId) {
        this.stateInitializationContext.checkpointSuccess(checkpointId);
        this.coordinator.notifyCheckpointComplete(checkpointId);
        if (this.asyncClustering) {
            try {
                this.clusteringFunctionWrapper.cluster(checkpointId);
            }
            catch (Exception e) {
                throw new HoodieException((Throwable)e);
            }
        }
    }

    @Override
    public void coordinatorFails() throws Exception {
        this.coordinator.close();
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)this.coordinatorContext));
    }

    @Override
    public void restartCoordinator() throws Exception {
        this.coordinator.close();
        this.coordinator = new StreamWriteOperatorCoordinator(this.conf, (OperatorCoordinator.Context)this.coordinatorContext);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)this.coordinatorContext));
    }

    @Override
    public void checkpointFails(long checkpointId) {
        this.coordinator.notifyCheckpointAborted(checkpointId);
    }

    @Override
    public void subTaskFails(int taskID, int attemptNumber) throws Exception {
        this.coordinator.subtaskFailed(taskID, (Throwable)new RuntimeException("Dummy exception"));
        this.runtimeContext.setAttemptNumber(attemptNumber);
        this.setupWriteFunction();
    }

    @Override
    public StreamWriteOperatorCoordinator getCoordinator() {
        return this.coordinator;
    }

    @Override
    public void close() throws Exception {
        this.coordinator.close();
        if (this.clusteringFunctionWrapper != null) {
            this.clusteringFunctionWrapper.close();
        }
    }

    public BulkInsertWriterHelper getWriterHelper() {
        return this.writeFunction.getWriterHelper();
    }

    private void setupWriteFunction() throws Exception {
        this.writeFunction = new AppendWriteFunction(this.conf, this.rowType);
        this.writeFunction.setRuntimeContext((RuntimeContext)this.runtimeContext);
        this.writeFunction.setOperatorEventGateway((OperatorEventGateway)this.gateway);
        this.writeFunction.initializeState((FunctionInitializationContext)this.stateInitializationContext);
        this.writeFunction.open(this.conf);
    }
}

