package org.apache.beam.sdk.nexmark.queries;

import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.AuctionBid;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/sdk/nexmark/queries/WinningBids.class */
public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> {
    private final AuctionOrBidWindowFn auctionOrBidWindowFn;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/nexmark/queries/WinningBids$AuctionOrBidWindow.class */
    public static class AuctionOrBidWindow extends IntervalWindow {
        public final long auction;
        public final boolean isAuctionWindow;

        private AuctionOrBidWindow() {
            super(TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE);
            this.auction = 0L;
            this.isAuctionWindow = false;
        }

        private AuctionOrBidWindow(Instant instant, Instant instant2, long j, boolean z) {
            super(instant, instant2);
            this.auction = j;
            this.isAuctionWindow = z;
        }

        public static AuctionOrBidWindow forAuction(Instant instant, Auction auction) {
            return new AuctionOrBidWindow(instant, auction.expires, auction.id, true);
        }

        public static AuctionOrBidWindow forBid(long j, Instant instant, Bid bid) {
            return new AuctionOrBidWindow(instant, instant.plus(Duration.millis(j * 2)), bid.auction, false);
        }

        public boolean isAuctionWindow() {
            return this.isAuctionWindow;
        }

        public String toString() {
            return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}", start(), end(), Long.valueOf(this.auction), Boolean.valueOf(this.isAuctionWindow));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass() || !super.equals(obj)) {
                return false;
            }
            AuctionOrBidWindow auctionOrBidWindow = (AuctionOrBidWindow) obj;
            return this.isAuctionWindow == auctionOrBidWindow.isAuctionWindow && this.auction == auctionOrBidWindow.auction;
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(super.hashCode()), Boolean.valueOf(this.isAuctionWindow), Long.valueOf(this.auction));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/nexmark/queries/WinningBids$AuctionOrBidWindowCoder.class */
    private static class AuctionOrBidWindowCoder extends CustomCoder<AuctionOrBidWindow> {
        private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder();
        private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder();
        private static final Coder<Long> ID_CODER = VarLongCoder.of();
        private static final Coder<Integer> INT_CODER = VarIntCoder.of();

        private AuctionOrBidWindowCoder() {
        }

        @JsonCreator
        public static AuctionOrBidWindowCoder of() {
            return INSTANCE;
        }

        public void encode(AuctionOrBidWindow auctionOrBidWindow, OutputStream outputStream) throws IOException, CoderException {
            SUPER_CODER.encode(auctionOrBidWindow, outputStream);
            ID_CODER.encode(Long.valueOf(auctionOrBidWindow.auction), outputStream);
            INT_CODER.encode(Integer.valueOf(auctionOrBidWindow.isAuctionWindow ? 1 : 0), outputStream);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public AuctionOrBidWindow m56decode(InputStream inputStream) throws IOException, CoderException {
            IntervalWindow intervalWindow = (IntervalWindow) SUPER_CODER.decode(inputStream);
            return new AuctionOrBidWindow(intervalWindow.start(), intervalWindow.end(), ((Long) ID_CODER.decode(inputStream)).longValue(), ((Integer) INT_CODER.decode(inputStream)).intValue() != 0);
        }

        public void verifyDeterministic() throws Coder.NonDeterministicException {
        }

        public Object structuralValue(AuctionOrBidWindow auctionOrBidWindow) {
            return auctionOrBidWindow;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/nexmark/queries/WinningBids$AuctionOrBidWindowFn.class */
    public static class AuctionOrBidWindowFn extends WindowFn<Event, AuctionOrBidWindow> {
        private final long expectedAuctionDurationMs;

        public AuctionOrBidWindowFn(long j) {
            this.expectedAuctionDurationMs = j;
        }

        public Collection<AuctionOrBidWindow> assignWindows(WindowFn<Event, AuctionOrBidWindow>.AssignContext assignContext) {
            Event event = (Event) assignContext.element();
            if (event.newAuction != null) {
                return Collections.singletonList(AuctionOrBidWindow.forAuction(assignContext.timestamp(), event.newAuction));
            }
            if (event.bid != null) {
                return Collections.singletonList(AuctionOrBidWindow.forBid(this.expectedAuctionDurationMs, assignContext.timestamp(), event.bid));
            }
            throw new IllegalArgumentException(String.format("%s can only assign windows to auctions and bids, but received %s", getClass().getSimpleName(), assignContext.element()));
        }

        public void mergeWindows(WindowFn<Event, AuctionOrBidWindow>.MergeContext mergeContext) throws Exception {
            TreeMap treeMap = new TreeMap();
            TreeMap treeMap2 = new TreeMap();
            for (AuctionOrBidWindow auctionOrBidWindow : mergeContext.windows()) {
                if (auctionOrBidWindow.isAuctionWindow()) {
                    treeMap.put(Long.valueOf(auctionOrBidWindow.auction), auctionOrBidWindow);
                } else {
                    ((List) treeMap2.computeIfAbsent(Long.valueOf(auctionOrBidWindow.auction), l -> {
                        return new ArrayList();
                    })).add(auctionOrBidWindow);
                }
            }
            for (Map.Entry entry : treeMap.entrySet()) {
                long longValue = ((Long) entry.getKey()).longValue();
                AuctionOrBidWindow auctionOrBidWindow2 = (AuctionOrBidWindow) entry.getValue();
                List<AuctionOrBidWindow> list = (List) treeMap2.get(Long.valueOf(longValue));
                if (list != null) {
                    ArrayList arrayList = new ArrayList();
                    for (AuctionOrBidWindow auctionOrBidWindow3 : list) {
                        if (auctionOrBidWindow3.start().isBefore(auctionOrBidWindow2.end())) {
                            arrayList.add(auctionOrBidWindow3);
                        }
                    }
                    if (!arrayList.isEmpty()) {
                        arrayList.add(auctionOrBidWindow2);
                        mergeContext.merge(arrayList, auctionOrBidWindow2);
                    }
                }
            }
        }

        public boolean isCompatible(WindowFn<?, ?> windowFn) {
            return windowFn instanceof AuctionOrBidWindowFn;
        }

        public Coder<AuctionOrBidWindow> windowCoder() {
            return AuctionOrBidWindowCoder.of();
        }

        public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() {
            throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs");
        }
    }

    public WinningBids(String str, NexmarkConfiguration nexmarkConfiguration) {
        super(str);
        long j = 0;
        for (long j2 : nexmarkConfiguration.rateShape.interEventDelayUs(nexmarkConfiguration.firstEventRate, nexmarkConfiguration.nextEventRate, nexmarkConfiguration.rateUnit, nexmarkConfiguration.numEventGenerators)) {
            j = Math.max(j, j2);
        }
        long j3 = ((((j * 50) / 3) * nexmarkConfiguration.numInFlightAuctions) + 999) / 1000;
        NexmarkUtils.console("Expected auction duration is %d ms", Long.valueOf(j3));
        this.auctionOrBidWindowFn = new AuctionOrBidWindowFn(j3);
    }

    public PCollection<AuctionBid> expand(PCollection<Event> pCollection) {
        PCollection apply = pCollection.apply("Window", Window.into(this.auctionOrBidWindowFn));
        return KeyedPCollectionTuple.of(NexmarkQueryUtil.AUCTION_TAG, apply.apply(NexmarkQueryUtil.JUST_NEW_AUCTIONS).apply("AuctionById:", NexmarkQueryUtil.AUCTION_BY_ID)).and(NexmarkQueryUtil.BID_TAG, apply.apply(NexmarkQueryUtil.JUST_BIDS).apply("BidByAuction", NexmarkQueryUtil.BID_BY_AUCTION)).apply(CoGroupByKey.create()).apply(this.name + ".Join", ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() { // from class: org.apache.beam.sdk.nexmark.queries.WinningBids.1
            private final Counter noAuctionCounter;
            private final Counter underReserveCounter;
            private final Counter noValidBidsCounter;

            {
                this.noAuctionCounter = Metrics.counter(WinningBids.this.name, "noAuction");
                this.underReserveCounter = Metrics.counter(WinningBids.this.name, "underReserve");
                this.noValidBidsCounter = Metrics.counter(WinningBids.this.name, "noValidBids");
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<Long, CoGbkResult>, AuctionBid>.ProcessContext processContext) {
                Auction auction = (Auction) ((CoGbkResult) ((KV) processContext.element()).getValue()).getOnly(NexmarkQueryUtil.AUCTION_TAG, (Object) null);
                if (auction == null) {
                    this.noAuctionCounter.inc();
                    return;
                }
                Bid bid = null;
                for (Bid bid2 : ((CoGbkResult) ((KV) processContext.element()).getValue()).getAll(NexmarkQueryUtil.BID_TAG)) {
                    Preconditions.checkState(bid2.dateTime.compareTo(auction.expires) < 0);
                    if (bid2.price < auction.reserve) {
                        this.underReserveCounter.inc();
                    } else if (bid == null || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid2, bid) > 0) {
                        bid = bid2;
                    }
                }
                if (bid == null) {
                    this.noValidBidsCounter.inc();
                } else {
                    processContext.output(new AuctionBid(auction, bid));
                }
            }
        }));
    }

    public int hashCode() {
        return Objects.hash(this.auctionOrBidWindowFn);
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return this.auctionOrBidWindowFn.equals(((WinningBids) obj).auctionOrBidWindowFn);
    }
}
