package org.apache.ignite.examples.streaming;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.examples.ExamplesUtils;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.streamer.StreamerContext;
import org.apache.ignite.streamer.StreamerStage;
import org.apache.ignite.streamer.StreamerWindow;
import org.apache.ignite.streamer.router.StreamerAffinityEventRouter;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/examples/streaming/StreamingPriceBarsExample.class */
public class StreamingPriceBarsExample {
    private static final int CNT = 10000000;
    private static final Random RAND = new Random();
    private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE"};
    private static final double[] INITIAL_PRICES = {194.9d, 893.49d, 34.21d, 23.24d};

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/examples/streaming/StreamingPriceBarsExample$Bar.class */
    public static class Bar implements StreamerAffinityEventRouter.AffinityEvent {
        private final String symbol;
        private volatile double open;
        private volatile double high;
        private volatile double low = 9.223372036854776E18d;
        private volatile double close;

        Bar(String str) {
            this.symbol = str;
        }

        public synchronized Bar copy() {
            Bar bar = new Bar(this.symbol);
            bar.open = this.open;
            bar.high = this.high;
            bar.low = this.low;
            bar.close = this.close;
            return bar;
        }

        public synchronized void update(double d) {
            if (this.open == 0.0d) {
                this.open = d;
            }
            this.high = Math.max(this.high, d);
            this.low = Math.min(this.low, d);
            this.close = d;
        }

        public synchronized void update(Bar bar) {
            if (this.open == 0.0d) {
                this.open = bar.open;
            }
            this.high = Math.max(this.high, bar.high);
            this.low = Math.min(this.low, bar.low);
            this.close = bar.close;
        }

        public String symbol() {
            return this.symbol;
        }

        public double open() {
            return this.open;
        }

        public double high() {
            return this.high;
        }

        public double low() {
            return this.low;
        }

        public double close() {
            return this.close;
        }

        /* renamed from: affinityKey, reason: merged with bridge method [inline-methods] */
        public String m36affinityKey() {
            return this.symbol;
        }

        public synchronized String toString() {
            return "Bar [symbol=" + this.symbol + ", open=" + this.open + ", high=" + this.high + ", low=" + this.low + ", close=" + this.close + ']';
        }
    }

    /* loaded from: input_file:org/apache/ignite/examples/streaming/StreamingPriceBarsExample$FirstStage.class */
    public static class FirstStage implements StreamerStage<Quote> {
        public String name() {
            return getClass().getSimpleName();
        }

        @Nullable
        public Map<String, Collection<?>> run(StreamerContext streamerContext, Collection<Quote> collection) {
            StreamerWindow window = streamerContext.window("stage1");
            window.enqueueAll(collection);
            Collection<Quote> pollEvictedBatch = window.pollEvictedBatch();
            if (pollEvictedBatch.isEmpty()) {
                return null;
            }
            HashMap hashMap = new HashMap();
            for (Quote quote : pollEvictedBatch) {
                String symbol = quote.symbol();
                Bar bar = (Bar) hashMap.get(symbol);
                if (bar == null) {
                    Bar bar2 = new Bar(symbol);
                    bar = bar2;
                    hashMap.put(symbol, bar2);
                }
                bar.update(quote.price());
            }
            return Collections.singletonMap(streamerContext.nextStageName(), hashMap.values());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/examples/streaming/StreamingPriceBarsExample$Quote.class */
    public static class Quote implements StreamerAffinityEventRouter.AffinityEvent {
        private final String symbol;
        private final double price;

        Quote(String str, double d) {
            this.symbol = str;
            this.price = d;
        }

        public String symbol() {
            return this.symbol;
        }

        public double price() {
            return this.price;
        }

        /* renamed from: affinityKey, reason: merged with bridge method [inline-methods] */
        public String m37affinityKey() {
            return this.symbol;
        }

        public String toString() {
            return "Quote [symbol=" + this.symbol + ", price=" + this.price + ']';
        }
    }

    /* loaded from: input_file:org/apache/ignite/examples/streaming/StreamingPriceBarsExample$SecondStage.class */
    public static class SecondStage implements StreamerStage<Bar> {
        public String name() {
            return getClass().getSimpleName();
        }

        @Nullable
        public Map<String, Collection<?>> run(StreamerContext streamerContext, Collection<Bar> collection) {
            ConcurrentMap localSpace = streamerContext.localSpace();
            StreamerWindow window = streamerContext.window("stage2");
            window.enqueueAll(collection);
            Collection<Bar> pollEvictedBatch = window.pollEvictedBatch();
            if (pollEvictedBatch.isEmpty()) {
                return null;
            }
            HashMap hashMap = new HashMap();
            for (Bar bar : pollEvictedBatch) {
                String symbol = bar.symbol();
                Bar bar2 = (Bar) hashMap.get(symbol);
                if (bar2 == null) {
                    Bar bar3 = new Bar(symbol);
                    bar2 = bar3;
                    hashMap.put(symbol, bar3);
                }
                bar2.update(bar);
            }
            localSpace.putAll(hashMap);
            return null;
        }
    }

    public static void main(String[] strArr) throws IgniteException {
        Timer timer = new Timer("priceBars");
        final Ignite start = Ignition.start("examples/config/example-streamer.xml");
        System.out.println();
        System.out.println(">>> Streaming price bars example started.");
        try {
            TimerTask scheduleQuery = scheduleQuery(start, timer);
            streamData(start);
            scheduleQuery.run();
            timer.cancel();
            start.compute().broadcast(new IgniteRunnable() { // from class: org.apache.ignite.examples.streaming.StreamingPriceBarsExample.1
                public void run() {
                    if (!ExamplesUtils.hasStreamer(start, "priceBars")) {
                        System.err.println("Default streamer not found (is example-streamer.xml configuration used on all nodes?)");
                        return;
                    }
                    IgniteStreamer streamer = start.streamer("priceBars");
                    System.out.println("Clearing bars from streamer.");
                    streamer.reset();
                }
            });
            Ignition.stop(true);
        } catch (Throwable th) {
            Ignition.stop(true);
            throw th;
        }
    }

    private static TimerTask scheduleQuery(final Ignite ignite, Timer timer) {
        TimerTask timerTask = new TimerTask() { // from class: org.apache.ignite.examples.streaming.StreamingPriceBarsExample.2
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    Iterator it = ((Collection) ignite.streamer("priceBars").context().reduce(new IgniteClosure<StreamerContext, Collection<Bar>>() { // from class: org.apache.ignite.examples.streaming.StreamingPriceBarsExample.2.1
                        public Collection<Bar> apply(StreamerContext streamerContext) {
                            Collection values = streamerContext.localSpace().values();
                            ArrayList arrayList = new ArrayList(values.size());
                            Iterator it2 = values.iterator();
                            while (it2.hasNext()) {
                                arrayList.add(((Bar) it2.next()).copy());
                            }
                            return arrayList;
                        }
                    }, new IgniteReducer<Collection<Bar>, Collection<Bar>>() { // from class: org.apache.ignite.examples.streaming.StreamingPriceBarsExample.2.2
                        private final Collection<Bar> res = new ArrayList();

                        public boolean collect(@Nullable Collection<Bar> collection) {
                            this.res.addAll(collection);
                            return true;
                        }

                        /* renamed from: reduce, reason: merged with bridge method [inline-methods] */
                        public Collection<Bar> m35reduce() {
                            return this.res;
                        }
                    })).iterator();
                    while (it.hasNext()) {
                        System.out.println(((Bar) it.next()).toString());
                    }
                    System.out.println("-----------------");
                } catch (IgniteException e) {
                    e.printStackTrace();
                }
            }
        };
        timer.schedule(timerTask, 2000L, 2000L);
        return timerTask;
    }

    private static void streamData(Ignite ignite) throws IgniteException {
        IgniteStreamer streamer = ignite.streamer("priceBars");
        for (int i = 0; i < CNT; i++) {
            for (int i2 = 0; i2 < INSTRUMENTS.length; i2++) {
                streamer.addEvent(new Quote(INSTRUMENTS[i2], round2(INITIAL_PRICES[i2] + RAND.nextGaussian())), new Object[0]);
            }
        }
    }

    private static double round2(double d) {
        return Math.floor((100.0d * d) + 0.5d) / 100.0d;
    }
}
