/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.utils;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
import org.apache.hudi.sink.utils.CollectorOutput;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.sink.utils.MockStreamingRuntimeContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.utils.TestConfigurations;

public class ClusteringFunctionWrapper {
    private final Configuration conf;
    private final IOManager ioManager = new IOManagerAsync();
    private final StreamingRuntimeContext runtimeContext;
    private final StreamTask<?, ?> streamTask;
    private final StreamConfig streamConfig;
    private ClusteringPlanOperator clusteringPlanOperator;
    private CollectorOutput<ClusteringCommitEvent> commitEventOutput;
    private ClusteringOperator clusteringOperator;
    private ClusteringCommitSink commitSink;

    public ClusteringFunctionWrapper(Configuration conf, StreamTask<?, ?> streamTask, StreamConfig streamConfig) {
        MockEnvironment environment = new MockEnvironmentBuilder().setTaskName("mockTask").setManagedMemorySize(131072L).setIOManager(this.ioManager).build();
        this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
        this.conf = conf;
        this.streamTask = streamTask;
        this.streamConfig = streamConfig;
    }

    public void openFunction() throws Exception {
        this.clusteringPlanOperator = new ClusteringPlanOperator(this.conf);
        this.clusteringPlanOperator.open();
        this.clusteringOperator = new ClusteringOperator(this.conf, TestConfigurations.ROW_TYPE);
        this.clusteringOperator.setProcessingTimeService((ProcessingTimeService)new TestProcessingTimeService());
        this.commitEventOutput = new CollectorOutput();
        this.clusteringOperator.setup(this.streamTask, this.streamConfig, this.commitEventOutput);
        this.clusteringOperator.open();
        MockCoordinatorExecutor syncExecutor = new MockCoordinatorExecutor((OperatorCoordinator.Context)new MockOperatorCoordinatorContext(new OperatorID(), 1));
        this.clusteringOperator.setExecutor((NonThrownExecutor)syncExecutor);
        this.commitSink = new ClusteringCommitSink(this.conf);
        this.commitSink.setRuntimeContext((RuntimeContext)this.runtimeContext);
        this.commitSink.open(this.conf);
    }

    public void cluster(long checkpointID) throws Exception {
        CollectorOutput planOutput = new CollectorOutput();
        this.clusteringPlanOperator.setOutput(planOutput);
        this.clusteringPlanOperator.notifyCheckpointComplete(checkpointID);
        for (ClusteringPlanEvent clusteringPlanEvent : planOutput.getRecords()) {
            this.clusteringOperator.processElement(new StreamRecord((Object)clusteringPlanEvent));
        }
        for (ClusteringCommitEvent clusteringCommitEvent : this.commitEventOutput.getRecords()) {
            this.commitSink.invoke(clusteringCommitEvent, null);
        }
    }

    public void close() throws Exception {
        this.ioManager.close();
    }
}

