package org.apache.beam.runners.fnexecution.control;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.function.BiConsumer;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.class */
public class TimerReceiverFactory {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TimerReceiverFactory.class);
    private final HashMap<KV<String, String>, ProcessBundleDescriptors.TimerSpec> transformAndTimerIdToSpecMap = new HashMap<>();
    private final BiConsumer<Timer<?>, TimerInternals.TimerData> timerDataConsumer;
    private final Coder windowCoder;

    public TimerReceiverFactory(StageBundleFactory stageBundleFactory, BiConsumer<Timer<?>, TimerInternals.TimerData> biConsumer, Coder coder) {
        Iterator<Map<String, ProcessBundleDescriptors.TimerSpec>> it = stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs().values().iterator();
        while (it.hasNext()) {
            for (ProcessBundleDescriptors.TimerSpec timerSpec : it.next().values()) {
                this.transformAndTimerIdToSpecMap.put(KV.of(timerSpec.transformId(), timerSpec.timerId()), timerSpec);
            }
        }
        this.timerDataConsumer = biConsumer;
        this.windowCoder = coder;
    }

    public <K> FnDataReceiver<Timer<K>> create(String str, String str2) {
        ProcessBundleDescriptors.TimerSpec timerSpec = this.transformAndTimerIdToSpecMap.get(KV.of(str, str2));
        return timer -> {
            Timer<?> timer = (Timer) Preconditions.checkNotNull(timer, "Received null Timer from SDK harness: %s", timer);
            LOG.debug("Timer received: {}", timer);
            Iterator<? extends BoundedWindow> it = timer.getWindows().iterator();
            while (it.hasNext()) {
                this.timerDataConsumer.accept(timer, TimerInternals.TimerData.of(encodeToTimerDataTimerId(timerSpec.transformId(), timerSpec.timerId()), StateNamespaces.window(this.windowCoder, it.next()), timer.getClearBit() ? BoundedWindow.TIMESTAMP_MAX_VALUE : timer.getFireTimestamp(), timer.getClearBit() ? BoundedWindow.TIMESTAMP_MAX_VALUE : timer.getHoldTimestamp(), timerSpec.getTimerSpec().getTimeDomain()));
            }
        };
    }

    public static String encodeToTimerDataTimerId(String str, String str2) {
        return str.length() + ":" + str + ":" + str2;
    }

    public static KV<String, String> decodeTimerDataTimerId(String str) {
        int indexOf = str.indexOf(":");
        if (indexOf <= 0) {
            throw new IllegalArgumentException(String.format("Invalid encoding, expected len(transformId):transformId:timerId as the encoding but received %s", str));
        }
        int parseInt = Integer.parseInt(str.substring(0, indexOf));
        return KV.of(str.substring(indexOf + 1, indexOf + 1 + parseInt), str.substring(indexOf + 1 + parseInt + 1));
    }
}
