/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.utils;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.MockStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.sink.utils.CollectorOutput;
import org.apache.hudi.sink.utils.CompactFunctionWrapper;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.sink.utils.MockStateInitializationContext;
import org.apache.hudi.sink.utils.MockStreamingRuntimeContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.sink.utils.TestFunctionWrapper;
import org.apache.hudi.utils.TestConfigurations;

public class StreamWriteFunctionWrapper<I>
implements TestFunctionWrapper<I> {
    private final Configuration conf;
    private final IOManager ioManager = new IOManagerAsync();
    private final StreamingRuntimeContext runtimeContext;
    private final MockOperatorEventGateway gateway;
    private final MockOperatorCoordinatorContext coordinatorContext;
    private final StreamWriteOperatorCoordinator coordinator;
    private final MockStateInitializationContext stateInitializationContext;
    private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
    private BootstrapOperator<HoodieRecord<?>, HoodieRecord<?>> bootstrapOperator;
    private BucketAssignFunction<String, HoodieRecord<?>, HoodieRecord<?>> bucketAssignerFunction;
    private final MockBucketAssignFunctionContext bucketAssignFunctionContext;
    private StreamWriteFunction<HoodieRecord<?>> writeFunction;
    private CompactFunctionWrapper compactFunctionWrapper;
    private final MockStreamTask streamTask;
    private final StreamConfig streamConfig;
    private final boolean asyncCompaction;

    public StreamWriteFunctionWrapper(String tablePath) throws Exception {
        this(tablePath, TestConfigurations.getDefaultConf(tablePath));
    }

    public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws Exception {
        MockEnvironment environment = new MockEnvironmentBuilder().setTaskName("mockTask").setManagedMemorySize(131072L).setIOManager(this.ioManager).build();
        this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
        this.gateway = new MockOperatorEventGateway();
        this.conf = conf;
        this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)this.coordinatorContext);
        this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
        this.stateInitializationContext = new MockStateInitializationContext();
        this.asyncCompaction = OptionsResolver.needsAsyncCompaction((Configuration)conf);
        this.streamConfig = new StreamConfig(conf);
        this.streamConfig.setOperatorID(new OperatorID());
        this.streamTask = new MockStreamTaskBuilder((Environment)environment).setConfig(new StreamConfig(conf)).setExecutionConfig(new ExecutionConfig().enableObjectReuse()).build();
        this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf, (StreamTask<?, ?>)this.streamTask, this.streamConfig);
    }

    @Override
    public void openFunction() throws Exception {
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)this.coordinatorContext));
        this.toHoodieFunction = new RowDataToHoodieFunction(TestConfigurations.ROW_TYPE, this.conf);
        this.toHoodieFunction.setRuntimeContext((RuntimeContext)this.runtimeContext);
        this.toHoodieFunction.open(this.conf);
        this.bucketAssignerFunction = new BucketAssignFunction(this.conf);
        this.bucketAssignerFunction.setRuntimeContext((RuntimeContext)this.runtimeContext);
        this.bucketAssignerFunction.open(this.conf);
        this.bucketAssignerFunction.initializeState((FunctionInitializationContext)this.stateInitializationContext);
        if (this.conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
            this.bootstrapOperator = new BootstrapOperator(this.conf);
            CollectorOutput output = new CollectorOutput();
            this.bootstrapOperator.setup((StreamTask)this.streamTask, this.streamConfig, output);
            this.bootstrapOperator.initializeState((StateInitializationContext)this.stateInitializationContext);
            ScalaCollector collector = ScalaCollector.getInstance();
            for (HoodieRecord bootstrapRecord : output.getRecords()) {
                this.bucketAssignerFunction.processElement((Object)bootstrapRecord, null, collector);
                this.bucketAssignFunctionContext.setCurrentKey(bootstrapRecord.getRecordKey());
            }
        }
        this.setupWriteFunction();
        if (this.asyncCompaction) {
            this.compactFunctionWrapper.openFunction();
        }
    }

    @Override
    public void invoke(I record) throws Exception {
        HoodieRecord hoodieRecord = this.toHoodieFunction.map((RowData)record);
        ScalaCollector collector = ScalaCollector.getInstance();
        this.bucketAssignerFunction.processElement((Object)hoodieRecord, null, collector);
        this.bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey());
        this.writeFunction.processElement(collector.getVal(), null, null);
    }

    @Override
    public WriteMetadataEvent[] getEventBuffer() {
        return this.coordinator.getEventBuffer();
    }

    @Override
    public OperatorEvent getNextEvent() {
        return this.gateway.getNextEvent();
    }

    @Override
    public Map<String, List<HoodieRecord>> getDataBuffer() {
        return this.writeFunction.getDataBuffer();
    }

    @Override
    public void checkpointFunction(long checkpointId) throws Exception {
        this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture());
        if (this.conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
            this.bootstrapOperator.snapshotState(null);
        }
        this.bucketAssignerFunction.snapshotState(null);
        this.writeFunction.snapshotState((FunctionSnapshotContext)new MockFunctionSnapshotContext(checkpointId));
        this.stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
    }

    public void endInput() {
        this.writeFunction.endInput();
    }

    @Override
    public void checkpointComplete(long checkpointId) {
        this.stateInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
        this.coordinator.notifyCheckpointComplete(checkpointId);
        this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
        if (this.asyncCompaction) {
            try {
                this.compactFunctionWrapper.compact(checkpointId);
            }
            catch (Exception e) {
                throw new HoodieException((Throwable)e);
            }
        }
    }

    @Override
    public void checkpointFails(long checkpointId) {
        this.coordinator.notifyCheckpointAborted(checkpointId);
    }

    @Override
    public void subTaskFails(int taskID) throws Exception {
        this.coordinator.subtaskFailed(taskID, (Throwable)new RuntimeException("Dummy exception"));
        this.setupWriteFunction();
    }

    @Override
    public void close() throws Exception {
        this.coordinator.close();
        this.ioManager.close();
        this.bucketAssignerFunction.close();
        this.writeFunction.close();
        if (this.compactFunctionWrapper != null) {
            this.compactFunctionWrapper.close();
        }
    }

    @Override
    public StreamWriteOperatorCoordinator getCoordinator() {
        return this.coordinator;
    }

    @Override
    public MockOperatorCoordinatorContext getCoordinatorContext() {
        return this.coordinatorContext;
    }

    @Override
    public boolean isKeyInState(HoodieKey hoodieKey) {
        return this.bucketAssignFunctionContext.isKeyInState(hoodieKey.getRecordKey());
    }

    @Override
    public boolean isConforming() {
        return this.writeFunction.isConfirming();
    }

    @Override
    public boolean isAlreadyBootstrap() throws Exception {
        return this.bootstrapOperator.isAlreadyBootstrap();
    }

    private void setupWriteFunction() throws Exception {
        this.writeFunction = new StreamWriteFunction(this.conf);
        this.writeFunction.setRuntimeContext((RuntimeContext)this.runtimeContext);
        this.writeFunction.setOperatorEventGateway((OperatorEventGateway)this.gateway);
        this.writeFunction.initializeState((FunctionInitializationContext)this.stateInitializationContext);
        this.writeFunction.open(this.conf);
        this.coordinator.handleEventFromOperator(0, this.getNextEvent());
    }

    private static class ScalaCollector<T>
    implements Collector<T> {
        private T val;

        private ScalaCollector() {
        }

        public static <T> ScalaCollector<T> getInstance() {
            return new ScalaCollector<T>();
        }

        public void collect(T t) {
            this.val = t;
        }

        public void close() {
            this.val = null;
        }

        public T getVal() {
            return this.val;
        }
    }

    private static class MockBucketAssignFunctionContext {
        private final Set<Object> updateKeys = new HashSet<Object>();

        private MockBucketAssignFunctionContext() {
        }

        public void setCurrentKey(Object key) {
            this.updateKeys.add(key);
        }

        public boolean isKeyInState(String key) {
            return this.updateKeys.contains(key);
        }
    }
}

