package org.apache.beam.sdk.nexmark.queries;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.UnmodifiableIterator;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/sdk/nexmark/queries/SessionSideInputJoinModel.class */
public class SessionSideInputJoinModel extends NexmarkQueryModel<Bid> {

    /* loaded from: input_file:org/apache/beam/sdk/nexmark/queries/SessionSideInputJoinModel$Simulator.class */
    private static class Simulator extends AbstractSimulator<Event, Bid> {
        private final NexmarkConfiguration configuration;
        private final Map<Long, List<TimestampedValue<Event>>> activeSessions;

        public Simulator(NexmarkConfiguration nexmarkConfiguration) {
            super(NexmarkUtils.standardEventIterator(nexmarkConfiguration));
            this.configuration = nexmarkConfiguration;
            this.activeSessions = new HashMap();
        }

        @Override // org.apache.beam.sdk.nexmark.queries.AbstractSimulator
        protected void run() {
            TimestampedValue<Event> nextInput = nextInput();
            if (nextInput == null) {
                UnmodifiableIterator it = ImmutableSet.copyOf(this.activeSessions.keySet()).iterator();
                while (it.hasNext()) {
                    flushSession(((Long) it.next()).longValue());
                }
                allDone();
                return;
            }
            Event event = (Event) nextInput.getValue();
            if (event.bid == null) {
                return;
            }
            List<TimestampedValue<Event>> list = this.activeSessions.get(Long.valueOf(event.bid.bidder));
            if (list == null) {
                beginSession(nextInput);
            } else if (!nextInput.getTimestamp().isAfter(list.get(list.size() - 1).getTimestamp().plus(this.configuration.sessionGap))) {
                list.add(nextInput);
            } else {
                flushSession(event.bid.bidder);
                beginSession(nextInput);
            }
        }

        private void beginSession(TimestampedValue<Event> timestampedValue) {
            Preconditions.checkState(this.activeSessions.get(Long.valueOf(((Event) timestampedValue.getValue()).bid.bidder)) == null);
            ArrayList arrayList = new ArrayList();
            arrayList.add(timestampedValue);
            this.activeSessions.put(Long.valueOf(((Event) timestampedValue.getValue()).bid.bidder), arrayList);
        }

        private void flushSession(long j) {
            List<TimestampedValue<Event>> list = this.activeSessions.get(Long.valueOf(j));
            Preconditions.checkState(list != null);
            Instant instant = (Instant) Ordering.natural().min((Iterable) list.stream().map(timestampedValue -> {
                return timestampedValue.getTimestamp();
            }).collect(Collectors.toList()));
            Instant plus = ((Instant) Ordering.natural().max((Iterable) list.stream().map(timestampedValue2 -> {
                return timestampedValue2.getTimestamp();
            }).collect(Collectors.toList()))).plus(this.configuration.sessionGap);
            for (TimestampedValue<Event> timestampedValue3 : list) {
                Bid bid = ((Event) timestampedValue3.getValue()).bid;
                addResult(TimestampedValue.of(new Bid(bid.auction, bid.bidder, bid.price, bid.dateTime, String.format("%d:%s:%s", Long.valueOf(bid.bidder % this.configuration.sideInputRowCount), instant, plus)), timestampedValue3.getTimestamp()));
            }
            this.activeSessions.remove(Long.valueOf(j));
        }
    }

    public SessionSideInputJoinModel(NexmarkConfiguration nexmarkConfiguration) {
        super(nexmarkConfiguration);
    }

    @Override // org.apache.beam.sdk.nexmark.queries.NexmarkQueryModel
    public AbstractSimulator<?, Bid> simulator() {
        return new Simulator(this.configuration);
    }

    @Override // org.apache.beam.sdk.nexmark.queries.NexmarkQueryModel
    protected Collection<String> toCollection(Iterator<TimestampedValue<Bid>> it) {
        return toValue(it);
    }
}
