package org.apache.flink.streaming.api.runners.python.beam;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.ImmutableExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.python.PythonConfig;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.env.ProcessPythonEnvironment;
import org.apache.flink.python.env.PythonEnvironment;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.utils.ByteArrayWrapper;
import org.apache.flink.streaming.api.utils.ByteArrayWrapperSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.class */
public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
    protected static final Logger LOG;
    private static final String PYTHON_STATE_PREFIX = "python-state-";
    private static final String INPUT_ID = "input";
    private static final String OUTPUT_ID = "output";
    private static final String TRANSFORM_ID = "transform";
    private static final String MAIN_INPUT_NAME = "input";
    private static final String MAIN_OUTPUT_NAME = "output";
    private static final String INPUT_CODER_ID = "input_coder";
    private static final String OUTPUT_CODER_ID = "output_coder";
    private static final String WINDOW_CODER_ID = "window_coder";
    private static final String WINDOW_STRATEGY = "windowing_strategy";
    private static final String MANAGED_MEMORY_RESOURCE_ID = "python-process-managed-memory";
    private static final String PYTHON_WORKER_MEMORY_LIMIT = "_PYTHON_WORKER_MEMORY_LIMIT";
    private transient boolean bundleStarted;
    private final String taskName;
    private final PythonEnvironmentManager environmentManager;
    private final String functionUrn;
    private final Map<String, String> jobOptions;
    protected final Tuple2<byte[], Integer> resultTuple = new Tuple2<>();
    private transient JobBundleFactory jobBundleFactory;
    private transient StageBundleFactory stageBundleFactory;
    private final StateRequestHandler stateRequestHandler;
    private transient BundleProgressHandler progressHandler;
    private transient RemoteBundle remoteBundle;
    protected transient LinkedBlockingQueue<byte[]> resultBuffer;
    protected transient FnDataReceiver<WindowedValue<byte[]>> mainInputReceiver;

    @Nullable
    private FlinkMetricContainer flinkMetricContainer;

    @Nullable
    private final MemoryManager memoryManager;
    private final double managedMemoryFraction;
    private OpaqueMemoryResource<PythonSharedResources> sharedResources;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner$IterateType.class */
    public enum IterateType {
        ITEMS((byte) 0),
        KEYS((byte) 1),
        VALUES((byte) 2);

        private final byte ord;

        IterateType(byte b) {
            this.ord = b;
        }

        public byte getOrd() {
            return this.ord;
        }

        public static IterateType fromOrd(byte b) {
            switch (b) {
                case 0:
                    return ITEMS;
                case 1:
                    return KEYS;
                case 2:
                    return VALUES;
                default:
                    throw new RuntimeException("Unsupported ordinal: " + ((int) b));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner$SimpleStateRequestHandler.class */
    public static class SimpleStateRequestHandler implements StateRequestHandler {
        private static final String CLEAR_CACHED_ITERATOR_MARK = "clear_iterators";
        private static final byte GET_FLAG = 0;
        private static final byte ITERATE_FLAG = 1;
        private static final byte CHECK_EMPTY_FLAG = 2;
        private static final byte EXIST_FLAG = 0;
        private static final byte IS_NONE_FLAG = 1;
        private static final byte NOT_EXIST_FLAG = 2;
        private static final byte IS_EMPTY_FLAG = 3;
        private static final byte NOT_EMPTY_FLAG = 4;
        private static final byte DELETE = 0;
        private static final byte SET_NONE = 1;
        private static final byte SET_VALUE = 2;
        private static final BeamFnApi.StateGetResponse.Builder NOT_EXIST_RESPONSE;
        private static final BeamFnApi.StateGetResponse.Builder IS_NONE_RESPONSE;
        private static final BeamFnApi.StateGetResponse.Builder IS_EMPTY_RESPONSE;
        private static final BeamFnApi.StateGetResponse.Builder NOT_EMPTY_RESPONSE;
        private final TypeSerializer keySerializer;
        private final KeyedStateBackend keyedStateBackend;
        private final int mapStateIterateResponseBatchSize;
        static final /* synthetic */ boolean $assertionsDisabled;
        private final ByteArrayWrapper reuseByteArrayWrapper = new ByteArrayWrapper(new byte[0]);
        private final TypeSerializer<byte[]> valueSerializer = PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO.createSerializer(new ExecutionConfig());
        private final ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos();
        private final DataInputViewStreamWrapper baisWrapper = new DataInputViewStreamWrapper(this.bais);
        private final ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
        private final DataOutputViewStreamWrapper baosWrapper = new DataOutputViewStreamWrapper(this.baos);
        private final Map<String, StateDescriptor> stateDescriptorCache = new HashMap();
        private final Map<ByteArrayWrapper, Iterator> mapStateIteratorCache = new HashMap();

        SimpleStateRequestHandler(KeyedStateBackend keyedStateBackend, TypeSerializer typeSerializer, Map<String, String> map) {
            this.keyedStateBackend = keyedStateBackend;
            this.keySerializer = typeSerializer;
            this.mapStateIterateResponseBatchSize = Integer.valueOf(map.getOrDefault(PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE.key(), ((Integer) PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE.defaultValue()).toString())).intValue();
            if (this.mapStateIterateResponseBatchSize <= 0) {
                throw new RuntimeException(String.format("The value of '%s' must be greater than 0!", PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE.key()));
            }
        }

        @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandler
        public CompletionStage<BeamFnApi.StateResponse.Builder> handle(BeamFnApi.StateRequest stateRequest) throws Exception {
            BeamFnApi.StateKey.TypeCase typeCase = stateRequest.getStateKey().getTypeCase();
            synchronized (this.keyedStateBackend) {
                if (typeCase.equals(BeamFnApi.StateKey.TypeCase.BAG_USER_STATE)) {
                    return handleBagState(stateRequest);
                }
                if (!typeCase.equals(BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT)) {
                    throw new RuntimeException("Unsupported state type: " + typeCase);
                }
                return handleMapState(stateRequest);
            }
        }

        private CompletionStage<BeamFnApi.StateResponse.Builder> handleBagState(BeamFnApi.StateRequest stateRequest) throws Exception {
            if (!stateRequest.getStateKey().hasBagUserState()) {
                throw new RuntimeException("Unsupported bag state request: " + stateRequest);
            }
            byte[] byteArray = stateRequest.getStateKey().getBagUserState().getKey().toByteArray();
            this.bais.setBuffer(byteArray, 0, byteArray.length);
            Object deserialize = this.keySerializer.deserialize(this.baisWrapper);
            if (this.keyedStateBackend.getKeySerializer() instanceof RowDataSerializer) {
                this.keyedStateBackend.setCurrentKey(this.keyedStateBackend.getKeySerializer().toBinaryRow((RowData) deserialize));
            } else {
                this.keyedStateBackend.setCurrentKey(deserialize);
            }
            switch (stateRequest.getRequestCase()) {
                case GET:
                    return handleBagGetRequest(stateRequest);
                case APPEND:
                    return handleBagAppendRequest(stateRequest);
                case CLEAR:
                    return handleBagClearRequest(stateRequest);
                default:
                    throw new RuntimeException(String.format("Unsupported request type %s for user state.", stateRequest.getRequestCase()));
            }
        }

        private List<ByteString> convertToByteString(ListState<byte[]> listState) throws Exception {
            LinkedList linkedList = new LinkedList();
            if (listState.get() == null) {
                return linkedList;
            }
            Iterator it = ((Iterable) listState.get()).iterator();
            while (it.hasNext()) {
                linkedList.add(ByteString.copyFrom((byte[]) it.next()));
            }
            return linkedList;
        }

        private CompletionStage<BeamFnApi.StateResponse.Builder> handleBagGetRequest(BeamFnApi.StateRequest stateRequest) throws Exception {
            return CompletableFuture.completedFuture(BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setGet(BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(convertToByteString(getListState(stateRequest))))));
        }

        private CompletionStage<BeamFnApi.StateResponse.Builder> handleBagAppendRequest(BeamFnApi.StateRequest stateRequest) throws Exception {
            getListState(stateRequest).add(stateRequest.getAppend().getData().toByteArray());
            return CompletableFuture.completedFuture(BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setAppend(BeamFnApi.StateAppendResponse.getDefaultInstance()));
        }

        private CompletionStage<BeamFnApi.StateResponse.Builder> handleBagClearRequest(BeamFnApi.StateRequest stateRequest) throws Exception {
            getListState(stateRequest).clear();
            return CompletableFuture.completedFuture(BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setClear(BeamFnApi.StateClearResponse.getDefaultInstance()));
        }

        private ListState<byte[]> getListState(BeamFnApi.StateRequest stateRequest) throws Exception {
            StateDescriptor listStateDescriptor;
            String str = BeamPythonFunctionRunner.PYTHON_STATE_PREFIX + stateRequest.getStateKey().getBagUserState().getUserStateId();
            StateDescriptor stateDescriptor = this.stateDescriptorCache.get(str);
            if (stateDescriptor instanceof ListStateDescriptor) {
                listStateDescriptor = (ListStateDescriptor) stateDescriptor;
            } else {
                if (stateDescriptor != null) {
                    throw new RuntimeException(String.format("State name corrupt detected: '%s' is used both as LIST state and '%s' state at the same time.", str, stateDescriptor.getType()));
                }
                listStateDescriptor = new ListStateDescriptor(str, this.valueSerializer);
                this.stateDescriptorCache.put(str, listStateDescriptor);
            }
            return this.keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
        }

        private CompletionStage<BeamFnApi.StateResponse.Builder> handleMapState(BeamFnApi.StateRequest stateRequest) throws Exception {
            if (!stateRequest.getStateKey().hasMultimapSideInput()) {
                throw new RuntimeException("Unsupported bag state request: " + stateRequest);
            }
            byte[] byteArray = stateRequest.getStateKey().getMultimapSideInput().getKey().toByteArray();
            this.bais.setBuffer(byteArray, 0, byteArray.length);
            this.keyedStateBackend.setCurrentKey(this.keyedStateBackend.getKeySerializer().toBinaryRow((RowData) this.keySerializer.deserialize(this.baisWrapper)));
            switch (stateRequest.getRequestCase()) {
                case GET:
                    return handleMapGetRequest(stateRequest);
                case APPEND:
                    return handleMapAppendRequest(stateRequest);
                case CLEAR:
                    return handleMapClearRequest(stateRequest);
                default:
                    throw new RuntimeException(String.format("Unsupported request type %s for user state.", stateRequest.getRequestCase()));
            }
        }

        private CompletionStage<BeamFnApi.StateResponse.Builder> handleMapGetRequest(BeamFnApi.StateRequest stateRequest) throws Exception {
            ByteArrayWrapper byteArrayWrapper;
            BeamFnApi.StateGetResponse.Builder handleMapIterateRequest;
            MapState<ByteArrayWrapper, byte[]> mapState = getMapState(stateRequest);
            byte[] byteArray = stateRequest.getGet().getContinuationToken().toByteArray();
            byte b = byteArray[0];
            switch (b) {
                case 0:
                    this.reuseByteArrayWrapper.setData(byteArray);
                    this.reuseByteArrayWrapper.setOffset(1);
                    this.reuseByteArrayWrapper.setLimit(byteArray.length);
                    handleMapIterateRequest = handleMapGetValueRequest(this.reuseByteArrayWrapper, mapState);
                    break;
                case 1:
                    this.bais.setBuffer(byteArray, 1, byteArray.length - 1);
                    IterateType fromOrd = IterateType.fromOrd(this.baisWrapper.readByte());
                    int readInt = this.baisWrapper.readInt();
                    if (readInt > 0) {
                        this.reuseByteArrayWrapper.setData(byteArray);
                        this.reuseByteArrayWrapper.setOffset(this.bais.getPosition());
                        this.reuseByteArrayWrapper.setLimit(this.bais.getPosition() + readInt);
                        byteArrayWrapper = this.reuseByteArrayWrapper;
                    } else {
                        byteArrayWrapper = null;
                    }
                    handleMapIterateRequest = handleMapIterateRequest(mapState, fromOrd, byteArrayWrapper);
                    break;
                case 2:
                    handleMapIterateRequest = handleMapCheckEmptyRequest(mapState);
                    break;
                default:
                    throw new RuntimeException(String.format("Unsupported get request type: '%d' for map state.", Byte.valueOf(b)));
            }
            return CompletableFuture.completedFuture(BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setGet(handleMapIterateRequest));
        }

        private CompletionStage<BeamFnApi.StateResponse.Builder> handleMapAppendRequest(BeamFnApi.StateRequest stateRequest) throws Exception {
            byte[] byteArray = stateRequest.getAppend().getData().toByteArray();
            this.bais.setBuffer(byteArray, 0, byteArray.length);
            MapState<ByteArrayWrapper, byte[]> mapState = getMapState(stateRequest);
            int readInt = this.baisWrapper.readInt();
            for (int i = 0; i < readInt; i++) {
                byte readByte = this.baisWrapper.readByte();
                int readInt2 = this.baisWrapper.readInt();
                this.reuseByteArrayWrapper.setData(byteArray);
                this.reuseByteArrayWrapper.setOffset(this.bais.getPosition());
                this.reuseByteArrayWrapper.setLimit(this.bais.getPosition() + readInt2);
                this.baisWrapper.skipBytesToRead(readInt2);
                switch (readByte) {
                    case 0:
                        mapState.remove(this.reuseByteArrayWrapper);
                        break;
                    case 1:
                        mapState.put(this.reuseByteArrayWrapper.copy(), (Object) null);
                        break;
                    case 2:
                        int readInt3 = this.baisWrapper.readInt();
                        byte[] bArr = new byte[readInt3];
                        int read = this.baisWrapper.read(bArr);
                        if (!$assertionsDisabled && readInt3 != read) {
                            throw new AssertionError();
                        }
                        mapState.put(this.reuseByteArrayWrapper.copy(), bArr);
                        break;
                    default:
                        throw new RuntimeException(String.format("Unsupported append request type: '%d' for map state.", Byte.valueOf(readByte)));
                }
            }
            return CompletableFuture.completedFuture(BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setAppend(BeamFnApi.StateAppendResponse.getDefaultInstance()));
        }

        private CompletionStage<BeamFnApi.StateResponse.Builder> handleMapClearRequest(BeamFnApi.StateRequest stateRequest) throws Exception {
            if (stateRequest.getStateKey().getMultimapSideInput().getTransformId().equals(CLEAR_CACHED_ITERATOR_MARK)) {
                this.mapStateIteratorCache.clear();
            } else {
                getMapState(stateRequest).clear();
            }
            return CompletableFuture.completedFuture(BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setClear(BeamFnApi.StateClearResponse.getDefaultInstance()));
        }

        private BeamFnApi.StateGetResponse.Builder handleMapGetValueRequest(ByteArrayWrapper byteArrayWrapper, MapState<ByteArrayWrapper, byte[]> mapState) throws Exception {
            if (!mapState.contains(byteArrayWrapper)) {
                return NOT_EXIST_RESPONSE;
            }
            byte[] bArr = (byte[]) mapState.get(byteArrayWrapper);
            if (bArr == null) {
                return IS_NONE_RESPONSE;
            }
            this.baos.reset();
            this.baosWrapper.writeByte(0);
            this.baosWrapper.write(bArr);
            return BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(this.baos.toByteArray()));
        }

        private BeamFnApi.StateGetResponse.Builder handleMapCheckEmptyRequest(MapState<ByteArrayWrapper, byte[]> mapState) throws Exception {
            return mapState.isEmpty() ? IS_EMPTY_RESPONSE : NOT_EMPTY_RESPONSE;
        }

        private BeamFnApi.StateGetResponse.Builder handleMapIterateRequest(MapState<ByteArrayWrapper, byte[]> mapState, IterateType iterateType, ByteArrayWrapper byteArrayWrapper) throws Exception {
            Iterator it;
            if (byteArrayWrapper == null) {
                switch (iterateType) {
                    case ITEMS:
                    case VALUES:
                        it = mapState.iterator();
                        break;
                    case KEYS:
                        it = mapState.keys().iterator();
                        break;
                    default:
                        throw new RuntimeException("Unsupported iterate type: " + iterateType);
                }
            } else {
                it = this.mapStateIteratorCache.get(byteArrayWrapper);
                if (it == null) {
                    throw new RuntimeException("The cached iterator does not exist!");
                }
            }
            this.baos.reset();
            switch (iterateType) {
                case ITEMS:
                case VALUES:
                    Iterator it2 = it;
                    for (int i = 0; i < this.mapStateIterateResponseBatchSize && it2.hasNext(); i++) {
                        Map.Entry entry = (Map.Entry) it2.next();
                        ByteArrayWrapper byteArrayWrapper2 = (ByteArrayWrapper) entry.getKey();
                        this.baosWrapper.write(byteArrayWrapper2.getData(), byteArrayWrapper2.getOffset(), byteArrayWrapper2.getLimit() - byteArrayWrapper2.getOffset());
                        this.baosWrapper.writeBoolean(entry.getValue() != null);
                        if (entry.getValue() != null) {
                            this.baosWrapper.write((byte[]) entry.getValue());
                        }
                    }
                case KEYS:
                    Iterator it3 = it;
                    for (int i2 = 0; i2 < this.mapStateIterateResponseBatchSize && it3.hasNext(); i2++) {
                        ByteArrayWrapper byteArrayWrapper3 = (ByteArrayWrapper) it3.next();
                        this.baosWrapper.write(byteArrayWrapper3.getData(), byteArrayWrapper3.getOffset(), byteArrayWrapper3.getLimit() - byteArrayWrapper3.getOffset());
                    }
                default:
                    throw new RuntimeException("Unsupported iterate type: " + iterateType);
            }
            if (it.hasNext()) {
                if (byteArrayWrapper == null) {
                    byteArrayWrapper = new ByteArrayWrapper(UUID.randomUUID().toString().getBytes());
                }
                this.mapStateIteratorCache.put(byteArrayWrapper, it);
            } else {
                if (byteArrayWrapper != null) {
                    this.mapStateIteratorCache.remove(byteArrayWrapper);
                }
                byteArrayWrapper = null;
            }
            BeamFnApi.StateGetResponse.Builder data = BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(this.baos.toByteArray()));
            if (byteArrayWrapper != null) {
                data.setContinuationToken(ByteString.copyFrom(byteArrayWrapper.getData(), byteArrayWrapper.getOffset(), byteArrayWrapper.getLimit() - byteArrayWrapper.getOffset()));
            }
            return data;
        }

        private MapState<ByteArrayWrapper, byte[]> getMapState(BeamFnApi.StateRequest stateRequest) throws Exception {
            StateDescriptor mapStateDescriptor;
            String str = BeamPythonFunctionRunner.PYTHON_STATE_PREFIX + stateRequest.getStateKey().getMultimapSideInput().getSideInputId();
            StateDescriptor stateDescriptor = this.stateDescriptorCache.get(str);
            if (stateDescriptor instanceof MapStateDescriptor) {
                mapStateDescriptor = (MapStateDescriptor) stateDescriptor;
            } else {
                if (stateDescriptor != null) {
                    throw new RuntimeException(String.format("State name corrupt detected: '%s' is used both as MAP state and '%s' state at the same time.", str, stateDescriptor.getType()));
                }
                mapStateDescriptor = new MapStateDescriptor(str, ByteArrayWrapperSerializer.INSTANCE, this.valueSerializer);
                this.stateDescriptorCache.put(str, mapStateDescriptor);
            }
            return this.keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mapStateDescriptor);
        }

        static {
            $assertionsDisabled = !BeamPythonFunctionRunner.class.desiredAssertionStatus();
            NOT_EXIST_RESPONSE = BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(new byte[]{2}));
            IS_NONE_RESPONSE = BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(new byte[]{1}));
            IS_EMPTY_RESPONSE = BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(new byte[]{3}));
            NOT_EMPTY_RESPONSE = BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(new byte[]{4}));
        }
    }

    public BeamPythonFunctionRunner(String str, PythonEnvironmentManager pythonEnvironmentManager, String str2, Map<String, String> map, FlinkMetricContainer flinkMetricContainer, @Nullable KeyedStateBackend keyedStateBackend, @Nullable TypeSerializer typeSerializer, @Nullable MemoryManager memoryManager, double d) {
        this.taskName = (String) Preconditions.checkNotNull(str);
        this.environmentManager = (PythonEnvironmentManager) Preconditions.checkNotNull(pythonEnvironmentManager);
        this.functionUrn = (String) Preconditions.checkNotNull(str2);
        this.jobOptions = (Map) Preconditions.checkNotNull(map);
        this.flinkMetricContainer = flinkMetricContainer;
        this.stateRequestHandler = getStateRequestHandler(keyedStateBackend, typeSerializer, map);
        this.memoryManager = memoryManager;
        this.managedMemoryFraction = d;
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void open(PythonConfig pythonConfig) throws Exception {
        this.bundleStarted = false;
        this.resultBuffer = new LinkedBlockingQueue<>();
        this.environmentManager.open();
        PortablePipelineOptions portablePipelineOptions = (PortablePipelineOptions) PipelineOptionsFactory.as(PortablePipelineOptions.class);
        if (this.jobOptions.containsKey(PythonOptions.STATE_CACHE_SIZE.key())) {
            ((ExperimentalOptions) portablePipelineOptions.as(ExperimentalOptions.class)).setExperiments(Collections.singletonList("state_cache_size=" + this.jobOptions.get(PythonOptions.STATE_CACHE_SIZE.key())));
        }
        Struct proto = PipelineOptionsTranslation.toProto(portablePipelineOptions);
        if (this.memoryManager == null || !pythonConfig.isUsingManagedMemory()) {
            this.jobBundleFactory = createJobBundleFactory(proto);
            this.stageBundleFactory = createStageBundleFactory(this.jobBundleFactory, createPythonExecutionEnvironment(-1L));
        } else {
            Preconditions.checkArgument(this.managedMemoryFraction > 0.0d && this.managedMemoryFraction <= 1.0d, "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type \"Python\" was missing or set to 0 for the config option \"taskmanager.memory.managed.consumer-weights\"." + this.managedMemoryFraction);
            this.sharedResources = this.memoryManager.getSharedMemoryResourceForManagedMemory(MANAGED_MEMORY_RESOURCE_ID, j -> {
                return new PythonSharedResources(createJobBundleFactory(proto), createPythonExecutionEnvironment(j));
            }, this.managedMemoryFraction);
            LOG.info("Obtained shared Python process of size {} bytes", Long.valueOf(this.sharedResources.getSize()));
            ((PythonSharedResources) this.sharedResources.getResourceHandle()).addPythonEnvironmentManager(this.environmentManager);
            this.stageBundleFactory = createStageBundleFactory(((PythonSharedResources) this.sharedResources.getResourceHandle()).getJobBundleFactory(), ((PythonSharedResources) this.sharedResources.getResourceHandle()).getEnvironment());
        }
        this.progressHandler = getProgressHandler(this.flinkMetricContainer);
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void close() throws Exception {
        try {
            if (this.jobBundleFactory != null) {
                this.jobBundleFactory.close();
            }
            try {
                if (this.sharedResources == null) {
                    this.environmentManager.close();
                } else if (((PythonSharedResources) this.sharedResources.getResourceHandle()).release()) {
                    this.sharedResources.close();
                }
            } finally {
                this.sharedResources = null;
            }
        } finally {
            this.jobBundleFactory = null;
        }
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void process(byte[] bArr) throws Exception {
        checkInvokeStartBundle();
        this.mainInputReceiver.accept(WindowedValue.valueInGlobalWindow(bArr));
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public Tuple2<byte[], Integer> pollResult() throws Exception {
        byte[] poll = this.resultBuffer.poll();
        if (poll == null) {
            return null;
        }
        this.resultTuple.f0 = poll;
        this.resultTuple.f1 = Integer.valueOf(poll.length);
        return this.resultTuple;
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void flush() throws Exception {
        if (this.bundleStarted) {
            finishBundle();
            this.bundleStarted = false;
        }
    }

    public JobBundleFactory createJobBundleFactory(Struct struct) throws Exception {
        return DefaultJobBundleFactory.create(JobInfo.create(this.taskName, this.taskName, this.environmentManager.createRetrievalToken(), struct));
    }

    private RunnerApi.Environment createPythonExecutionEnvironment(long j) throws Exception {
        PythonEnvironment createEnvironment = this.environmentManager.createEnvironment();
        if (!(createEnvironment instanceof ProcessPythonEnvironment)) {
            throw new RuntimeException("Currently only ProcessPythonEnvironment is supported.");
        }
        ProcessPythonEnvironment processPythonEnvironment = (ProcessPythonEnvironment) createEnvironment;
        Map<String, String> env = processPythonEnvironment.getEnv();
        env.putAll(this.jobOptions);
        env.put(PYTHON_WORKER_MEMORY_LIMIT, String.valueOf(j));
        return Environments.createProcessEnvironment("", "", processPythonEnvironment.getCommand(), env);
    }

    protected void startBundle() {
        try {
            this.remoteBundle = this.stageBundleFactory.getBundle(createOutputReceiverFactory(), this.stateRequestHandler, this.progressHandler);
            this.mainInputReceiver = (FnDataReceiver) Preconditions.checkNotNull(Iterables.getOnlyElement(this.remoteBundle.getInputReceivers().values()), "Failed to retrieve main input receiver.");
        } catch (Throwable th) {
            throw new RuntimeException("Failed to start remote bundle", th);
        }
    }

    private void finishBundle() {
        RuntimeException runtimeException;
        try {
            try {
                this.remoteBundle.close();
                this.remoteBundle = null;
            } finally {
            }
        } catch (Throwable th) {
            this.remoteBundle = null;
            throw th;
        }
    }

    private OutputReceiverFactory createOutputReceiverFactory() {
        return new OutputReceiverFactory() { // from class: org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.1
            @Override // org.apache.beam.runners.fnexecution.control.OutputReceiverFactory
            public FnDataReceiver<WindowedValue<byte[]>> create(String str) {
                return windowedValue -> {
                    BeamPythonFunctionRunner.this.resultBuffer.add(windowedValue.getValue());
                };
            }
        };
    }

    private BundleProgressHandler getProgressHandler(final FlinkMetricContainer flinkMetricContainer) {
        return flinkMetricContainer == null ? BundleProgressHandler.ignored() : new BundleProgressHandler() { // from class: org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.2
            @Override // org.apache.beam.runners.fnexecution.control.BundleProgressHandler
            public void onProgress(BeamFnApi.ProcessBundleProgressResponse processBundleProgressResponse) {
                flinkMetricContainer.updateMetrics(BeamPythonFunctionRunner.this.taskName, processBundleProgressResponse.getMonitoringInfosList());
            }

            @Override // org.apache.beam.runners.fnexecution.control.BundleProgressHandler
            public void onCompleted(BeamFnApi.ProcessBundleResponse processBundleResponse) {
                flinkMetricContainer.updateMetrics(BeamPythonFunctionRunner.this.taskName, processBundleResponse.getMonitoringInfosList());
            }
        };
    }

    private StageBundleFactory createStageBundleFactory(JobBundleFactory jobBundleFactory, RunnerApi.Environment environment) throws Exception {
        try {
            return jobBundleFactory.forStage(createExecutableStage(environment));
        } catch (Throwable th) {
            throw new RuntimeException(this.environmentManager.getBootLog(), th);
        }
    }

    private void checkInvokeStartBundle() throws Exception {
        if (this.bundleStarted) {
            return;
        }
        startBundle();
        this.bundleStarted = true;
    }

    private ExecutableStage createExecutableStage(RunnerApi.Environment environment) throws Exception {
        RunnerApi.Components build = RunnerApi.Components.newBuilder().putPcollections("input", RunnerApi.PCollection.newBuilder().setWindowingStrategyId(WINDOW_STRATEGY).setCoderId(INPUT_CODER_ID).build()).putPcollections("output", RunnerApi.PCollection.newBuilder().setWindowingStrategyId(WINDOW_STRATEGY).setCoderId(OUTPUT_CODER_ID).build()).putTransforms(TRANSFORM_ID, RunnerApi.PTransform.newBuilder().setUniqueName(TRANSFORM_ID).setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(this.functionUrn).setPayload(ByteString.copyFrom(getUserDefinedFunctionsProtoBytes())).build()).putInputs("input", "input").putOutputs("output", "output").build()).putWindowingStrategies(WINDOW_STRATEGY, RunnerApi.WindowingStrategy.newBuilder().setWindowCoderId(WINDOW_CODER_ID).build()).putCoders(INPUT_CODER_ID, getInputCoderProto()).putCoders(OUTPUT_CODER_ID, getOutputCoderProto()).putCoders(WINDOW_CODER_ID, getWindowCoderProto()).build();
        return ImmutableExecutableStage.of(build, environment, PipelineNode.pCollection("input", build.getPcollectionsOrThrow("input")), Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.singletonList(PipelineNode.pTransform(TRANSFORM_ID, build.getTransformsOrThrow(TRANSFORM_ID))), Collections.singletonList(PipelineNode.pCollection("output", build.getPcollectionsOrThrow("output"))), createValueOnlyWireCoderSetting());
    }

    private Collection<RunnerApi.ExecutableStagePayload.WireCoderSetting> createValueOnlyWireCoderSetting() throws IOException {
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(new byte[0]);
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        of.encode(valueInGlobalWindow, (OutputStream) byteArrayOutputStream);
        return Arrays.asList(RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder().setUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE)).setPayload(ByteString.copyFrom(byteArrayOutputStream.toByteArray())).setInputOrOutputId("input").build(), RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder().setUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE)).setPayload(ByteString.copyFrom(byteArrayOutputStream.toByteArray())).setInputOrOutputId("output").build());
    }

    protected abstract byte[] getUserDefinedFunctionsProtoBytes();

    protected abstract RunnerApi.Coder getInputCoderProto();

    protected abstract RunnerApi.Coder getOutputCoderProto();

    private RunnerApi.Coder getWindowCoderProto() {
        return RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN).build()).build();
    }

    private static StateRequestHandler getStateRequestHandler(KeyedStateBackend keyedStateBackend, TypeSerializer typeSerializer, Map<String, String> map) {
        if (keyedStateBackend == null) {
            return StateRequestHandler.unsupported();
        }
        if ($assertionsDisabled || typeSerializer != null) {
            return new SimpleStateRequestHandler(keyedStateBackend, typeSerializer, map);
        }
        throw new AssertionError();
    }

    static {
        $assertionsDisabled = !BeamPythonFunctionRunner.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(BeamPythonFunctionRunner.class);
    }
}
