package org.apache.beam.runners.twister2.translators.functions;

import edu.iu.dsc.tws.api.tset.TSetContext;
import edu.iu.dsc.tws.api.tset.fn.ComputeCollectorFunc;
import edu.iu.dsc.tws.api.tset.fn.RecordCollector;
import java.io.IOException;
import java.io.ObjectStreamException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Logger;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.runners.twister2.Twister2TranslationContext;
import org.apache.beam.runners.twister2.utils.NoOpStepContext;
import org.apache.beam.runners.twister2.utils.Twister2SideInputReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.DoFnWithExecutionInformation;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.InvalidProtocolBufferException;

/* loaded from: input_file:org/apache/beam/runners/twister2/translators/functions/DoFnFunction.class */
public class DoFnFunction<OutputT, InputT> implements ComputeCollectorFunc<RawUnionValue, Iterator<WindowedValue<InputT>>> {
    private static final Logger LOG = Logger.getLogger(DoFnFunction.class.getName());
    private transient DoFn<InputT, OutputT> doFn;
    private transient PipelineOptions pipelineOptions;
    private static final long serialVersionUID = -5701440128544343353L;
    private transient Coder<InputT> inputCoder;
    private transient Map<TupleTag<?>, Coder<?>> outputCoders;
    private transient WindowingStrategy<?, ?> windowingStrategy;
    private transient Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs;
    private transient TupleTag<OutputT> mainOutput;
    private transient Twister2SideInputReader sideInputReader;
    private transient DoFnRunner<InputT, OutputT> doFnRunner;
    private transient DoFnOutputManager outputManager;
    private transient List<TupleTag<?>> sideOutputs;
    private StepContext stepcontext;
    private transient DoFnSchemaInformation doFnSchemaInformation;
    private transient Map<TupleTag<?>, Integer> outputMap;
    private transient Map<String, PCollectionView<?>> sideInputMapping;
    private transient DoFnInvoker doFnInvoker;
    private transient boolean isInitialized;
    private transient RunnerApi.FunctionSpec doFnwithEx;
    private byte[] doFnwithExBytes;
    private byte[] coderBytes;
    private Map<String, byte[]> outputCodersBytes;
    private transient RunnerApi.MessageWithComponents windowStrategyProto;
    private byte[] windowBytes;
    private Map<String, byte[]> sideInputBytes;
    private String serializedOptions;
    private List<String> serializedSideOutputs;
    private Map<String, Integer> serializedOutputMap;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/twister2/translators/functions/DoFnFunction$DoFnOutputManager.class */
    public static class DoFnOutputManager implements DoFnRunners.OutputManager, Serializable {
        private static final long serialVersionUID = 4967375172737408160L;
        private transient List<RawUnionValue> outputs;
        private transient Set<TupleTag<?>> outputTags;
        private Map<TupleTag<?>, Integer> outputMap;

        private DoFnOutputManager() {
        }

        DoFnOutputManager(Map<TupleTag<?>, Integer> map) {
            this.outputMap = map;
        }

        public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            if (this.outputTags.contains(tupleTag)) {
                this.outputs.add(new RawUnionValue(this.outputMap.get(tupleTag).intValue(), windowedValue));
            }
        }

        void setup(TupleTag<?> tupleTag, List<TupleTag<?>> list) {
            this.outputs = new ArrayList();
            this.outputTags = new HashSet();
            this.outputTags.add(tupleTag);
            this.outputTags.addAll(list);
        }

        void clear() {
            this.outputs.clear();
        }

        Iterator<RawUnionValue> getOutputs() {
            return this.outputs.iterator();
        }
    }

    public DoFnFunction() {
        this.isInitialized = false;
        this.isInitialized = false;
    }

    public DoFnFunction(Twister2TranslationContext twister2TranslationContext, DoFn<InputT, OutputT> doFn, Coder<InputT> coder, Map<TupleTag<?>, Coder<?>> map, List<TupleTag<?>> list, WindowingStrategy<?, ?> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> map2, TupleTag<OutputT> tupleTag, DoFnSchemaInformation doFnSchemaInformation, Map<TupleTag<?>, Integer> map3, Map<String, PCollectionView<?>> map4) {
        this.isInitialized = false;
        this.doFn = doFn;
        this.pipelineOptions = twister2TranslationContext.getOptions();
        this.serializedOptions = new SerializablePipelineOptions(this.pipelineOptions).toString();
        this.inputCoder = coder;
        this.outputCoders = map;
        this.windowingStrategy = windowingStrategy;
        this.sideInputs = convertToTuples(map2);
        this.mainOutput = tupleTag;
        this.doFnSchemaInformation = doFnSchemaInformation;
        this.sideOutputs = list;
        this.stepcontext = new NoOpStepContext();
        this.outputMap = map3;
        this.sideInputMapping = map4;
        this.outputManager = new DoFnOutputManager(this.outputMap);
        prepareSerialization();
    }

    private Map<TupleTag<?>, WindowingStrategy<?, ?>> convertToTuples(Map<PCollectionView<?>, WindowingStrategy<?, ?>> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<PCollectionView<?>, WindowingStrategy<?, ?>> entry : map.entrySet()) {
            hashMap.put(entry.getKey().getTagInternal(), entry.getValue());
        }
        return hashMap;
    }

    public Set<String> getSideInputKeys() {
        initTransient();
        HashSet hashSet = new HashSet();
        Iterator<TupleTag<?>> it = this.sideInputs.keySet().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getId());
        }
        return hashSet;
    }

    public void prepare(TSetContext tSetContext) {
        initTransient();
        this.sideInputReader = new Twister2SideInputReader(this.sideInputs, tSetContext);
        this.outputManager.setup(this.mainOutput, this.sideOutputs);
        this.doFnInvoker = DoFnInvokers.tryInvokeSetupFor(this.doFn, this.pipelineOptions);
        this.doFnRunner = DoFnRunners.simpleRunner(this.pipelineOptions, this.doFn, this.sideInputReader, this.outputManager, this.mainOutput, this.sideOutputs, this.stepcontext, this.inputCoder, this.outputCoders, this.windowingStrategy, this.doFnSchemaInformation, this.sideInputMapping);
    }

    public void compute(Iterator<WindowedValue<InputT>> it, RecordCollector<RawUnionValue> recordCollector) {
        try {
            this.outputManager.clear();
            this.doFnRunner.startBundle();
            while (it.hasNext()) {
                this.doFnRunner.processElement(it.next());
            }
            this.doFnRunner.finishBundle();
            Iterator<RawUnionValue> outputs = this.outputManager.getOutputs();
            while (outputs.hasNext()) {
                recordCollector.collect(outputs.next());
            }
        } catch (RuntimeException e) {
            DoFnInvokers.invokerFor(this.doFn).invokeTeardown();
            throw e;
        }
    }

    public void close() {
        Optional.ofNullable(this.doFnInvoker).ifPresent((v0) -> {
            v0.invokeTeardown();
        });
    }

    protected Object readResolve() throws ObjectStreamException {
        return this;
    }

    private void prepareSerialization() {
        SdkComponents create = SdkComponents.create();
        create.registerEnvironment(Environments.createOrGetDefaultEnvironment(this.pipelineOptions.as(PortablePipelineOptions.class)));
        this.serializedOptions = new SerializablePipelineOptions(this.pipelineOptions).toString();
        this.doFnwithEx = ParDoTranslation.translateDoFn(this.doFn, this.mainOutput, this.sideInputMapping, this.doFnSchemaInformation, create);
        this.doFnwithExBytes = this.doFnwithEx.getPayload().toByteArray();
        this.outputCodersBytes = new HashMap();
        try {
            this.coderBytes = SerializableUtils.serializeToByteArray(this.inputCoder);
            this.windowStrategyProto = WindowingStrategyTranslation.toMessageProto(this.windowingStrategy, create);
            this.windowBytes = this.windowStrategyProto.toByteArray();
            for (Map.Entry<TupleTag<?>, Coder<?>> entry : this.outputCoders.entrySet()) {
                this.outputCodersBytes.put(entry.getKey().getId(), SerializableUtils.serializeToByteArray(entry.getValue()));
            }
            this.sideInputBytes = new HashMap();
            for (Map.Entry<TupleTag<?>, WindowingStrategy<?, ?>> entry2 : this.sideInputs.entrySet()) {
                this.windowStrategyProto = WindowingStrategyTranslation.toMessageProto(entry2.getValue(), create);
                this.sideInputBytes.put(entry2.getKey().getId(), this.windowStrategyProto.toByteArray());
            }
            this.serializedSideOutputs = new ArrayList();
            Iterator<TupleTag<?>> it = this.sideOutputs.iterator();
            while (it.hasNext()) {
                this.serializedSideOutputs.add(it.next().getId());
            }
            this.serializedOutputMap = new HashMap();
            for (Map.Entry<TupleTag<?>, Integer> entry3 : this.outputMap.entrySet()) {
                this.serializedOutputMap.put(entry3.getKey().getId(), entry3.getValue());
            }
        } catch (IOException e) {
            LOG.info(e.getMessage());
        }
    }

    private void initTransient() {
        if (this.isInitialized) {
            return;
        }
        try {
            SdkComponents create = SdkComponents.create();
            this.pipelineOptions = new SerializablePipelineOptions(this.serializedOptions).get();
            DoFnWithExecutionInformation doFnWithExecutionInformation = (DoFnWithExecutionInformation) SerializableUtils.deserializeFromByteArray(this.doFnwithExBytes, "Custom Coder Bytes");
            this.doFn = doFnWithExecutionInformation.getDoFn();
            this.mainOutput = doFnWithExecutionInformation.getMainOutputTag();
            this.sideInputMapping = doFnWithExecutionInformation.getSideInputMapping();
            this.doFnSchemaInformation = doFnWithExecutionInformation.getSchemaInformation();
            this.inputCoder = (Coder) SerializableUtils.deserializeFromByteArray(this.coderBytes, "Custom Coder Bytes");
            this.windowStrategyProto = RunnerApi.MessageWithComponents.parseFrom(this.windowBytes);
            this.windowingStrategy = WindowingStrategyTranslation.fromProto(this.windowStrategyProto.getWindowingStrategy(), RehydratedComponents.forComponents(create.toComponents()));
            this.sideInputs = new HashMap();
            for (Map.Entry<String, byte[]> entry : this.sideInputBytes.entrySet()) {
                this.windowStrategyProto = RunnerApi.MessageWithComponents.parseFrom(entry.getValue());
                this.sideInputs.put(new TupleTag<>(entry.getKey()), WindowingStrategyTranslation.fromProto(this.windowStrategyProto.getWindowingStrategy(), RehydratedComponents.forComponents(create.toComponents())));
            }
        } catch (InvalidProtocolBufferException e) {
            LOG.info(e.getMessage());
        }
        this.outputCoders = new HashMap();
        for (Map.Entry<String, byte[]> entry2 : this.outputCodersBytes.entrySet()) {
            this.outputCoders.put(new TupleTag<>(entry2.getKey()), (Coder) SerializableUtils.deserializeFromByteArray(entry2.getValue(), "Custom Coder Bytes"));
        }
        this.sideOutputs = new ArrayList();
        Iterator<String> it = this.serializedSideOutputs.iterator();
        while (it.hasNext()) {
            this.sideOutputs.add(new TupleTag<>(it.next()));
        }
        this.outputMap = new HashMap();
        for (Map.Entry<String, Integer> entry3 : this.serializedOutputMap.entrySet()) {
            this.outputMap.put(new TupleTag<>(entry3.getKey()), entry3.getValue());
        }
        this.outputManager = new DoFnOutputManager(this.outputMap);
        this.isInitialized = true;
    }

    public /* bridge */ /* synthetic */ void compute(Object obj, RecordCollector recordCollector) {
        compute((Iterator) obj, (RecordCollector<RawUnionValue>) recordCollector);
    }
}
