package org.apache.paimon.flink.service;

import java.net.InetSocketAddress;
import java.util.Collections;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.service.network.NetworkUtils;
import org.apache.paimon.service.network.stats.DisabledServiceRequestStats;
import org.apache.paimon.service.server.KvQueryServer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SerializationUtils;

/* loaded from: input_file:org/apache/paimon/flink/service/QueryExecutorOperator.class */
public class QueryExecutorOperator extends AbstractStreamOperator<InternalRow> implements OneInputStreamOperator<InternalRow, InternalRow> {
    private static final long serialVersionUID = 1;
    private final Table table;
    private transient LocalTableQuery query;
    private transient IOManager ioManager;

    public QueryExecutorOperator(Table table) {
        this.table = table;
    }

    public static RowType outputType() {
        return RowType.of(DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.INT());
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.ioManager = IOManager.create(getContainingTask().getEnvironment().getIOManager().getSpillingDirectoriesPaths());
        this.query = ((FileStoreTable) this.table).newLocalTableQuery().withIOManager(this.ioManager);
        KvQueryServer kvQueryServer = new KvQueryServer(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), NetworkUtils.findHostAddress(), Collections.singletonList(0).iterator(), 1, 1, this.query, new DisabledServiceRequestStats());
        try {
            kvQueryServer.start();
            InetSocketAddress serverAddress = kvQueryServer.getServerAddress();
            this.output.collect(new StreamRecord(GenericRow.of(Integer.valueOf(getRuntimeContext().getNumberOfParallelSubtasks()), Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), BinaryString.fromString(serverAddress.getHostName()), Integer.valueOf(serverAddress.getPort()))));
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    public void processElement(StreamRecord<InternalRow> streamRecord) throws Exception {
        InternalRow internalRow = (InternalRow) streamRecord.getValue();
        BinaryRow deserializeBinaryRow = SerializationUtils.deserializeBinaryRow(internalRow.getBinary(1));
        int i = internalRow.getInt(2);
        DataFileMetaSerializer dataFileMetaSerializer = new DataFileMetaSerializer();
        this.query.refreshFiles(deserializeBinaryRow, i, dataFileMetaSerializer.deserializeList(internalRow.getBinary(3)), dataFileMetaSerializer.deserializeList(internalRow.getBinary(4)));
    }

    public void close() throws Exception {
        super.close();
        if (this.query != null) {
            this.query.close();
        }
        if (this.ioManager != null) {
            this.ioManager.close();
        }
    }
}
