package org.apache.ignite.examples.streaming;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.examples.ExamplesUtils;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.streamer.StreamerContext;
import org.apache.ignite.streamer.StreamerFailureListener;
import org.apache.ignite.streamer.StreamerStage;
import org.apache.ignite.streamer.StreamerWindow;
import org.apache.ignite.streamer.index.StreamerIndexEntry;
import org.apache.ignite.streamer.index.StreamerIndexUpdater;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/examples/streaming/StreamingCheckInExample.class */
public class StreamingCheckInExample {
    private static final String STREAMER_NAME = "check-in";
    private static final double NEARBY_DISTANCE = 5.0d;
    private static final Random RAND;
    private static final int CNT = 60;
    private static final String[] USER_NAMES;
    private static final Place[] TRACKED_PLACES;
    private static final int MAX_X = 30;
    private static final int MAX_Y = 30;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/ignite/examples/streaming/StreamingCheckInExample$AddToWindowStage.class */
    public static class AddToWindowStage implements StreamerStage<CheckInEvent> {
        static final /* synthetic */ boolean $assertionsDisabled;

        public String name() {
            return getClass().getSimpleName();
        }

        @Nullable
        public Map<String, Collection<?>> run(StreamerContext streamerContext, Collection<CheckInEvent> collection) {
            StreamerWindow window = streamerContext.window(name());
            if (!$assertionsDisabled && window == null) {
                throw new AssertionError();
            }
            LinkedList linkedList = new LinkedList();
            for (CheckInEvent checkInEvent : collection) {
                try {
                    window.enqueue(checkInEvent);
                    linkedList.add(checkInEvent);
                } catch (IgniteException e) {
                    if (!e.getMessage().contains("Index unique key violation")) {
                        throw e;
                    }
                    System.err.println("Cannot check-in twice within the specified period of time [evt=" + checkInEvent + ']');
                }
            }
            window.pollEvictedAll();
            if (linkedList.isEmpty()) {
                return null;
            }
            return Collections.singletonMap(streamerContext.nextStageName(), linkedList);
        }

        static {
            $assertionsDisabled = !StreamingCheckInExample.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/examples/streaming/StreamingCheckInExample$CheckInEvent.class */
    public static class CheckInEvent {
        private final String userName;
        private final Location location;

        CheckInEvent(String str, Location location) {
            this.userName = str;
            this.location = location;
        }

        public String userName() {
            return this.userName;
        }

        public Location location() {
            return this.location;
        }

        public String toString() {
            return "CheckInEvent [userName=" + this.userName + ", location=" + this.location + ']';
        }
    }

    /* loaded from: input_file:org/apache/ignite/examples/streaming/StreamingCheckInExample$CheckInEventIndexUpdater.class */
    private static class CheckInEventIndexUpdater implements StreamerIndexUpdater<CheckInEvent, String, Location> {
        private CheckInEventIndexUpdater() {
        }

        @Nullable
        public String indexKey(CheckInEvent checkInEvent) {
            return checkInEvent.userName();
        }

        @Nullable
        public Location initialValue(CheckInEvent checkInEvent, String str) {
            return checkInEvent.location();
        }

        @Nullable
        public Location onAdded(StreamerIndexEntry<CheckInEvent, String, Location> streamerIndexEntry, CheckInEvent checkInEvent) {
            throw new AssertionError("onAdded() shouldn't be called on unique index.");
        }

        @Nullable
        public Location onRemoved(StreamerIndexEntry<CheckInEvent, String, Location> streamerIndexEntry, CheckInEvent checkInEvent) {
            return null;
        }

        public /* bridge */ /* synthetic */ Object onRemoved(StreamerIndexEntry streamerIndexEntry, Object obj) {
            return onRemoved((StreamerIndexEntry<CheckInEvent, String, Location>) streamerIndexEntry, (CheckInEvent) obj);
        }

        public /* bridge */ /* synthetic */ Object onAdded(StreamerIndexEntry streamerIndexEntry, Object obj) throws IgniteException {
            return onAdded((StreamerIndexEntry<CheckInEvent, String, Location>) streamerIndexEntry, (CheckInEvent) obj);
        }
    }

    /* loaded from: input_file:org/apache/ignite/examples/streaming/StreamingCheckInExample$DetectPlacesStage.class */
    private static class DetectPlacesStage implements StreamerStage<CheckInEvent> {
        static final /* synthetic */ boolean $assertionsDisabled;

        private DetectPlacesStage() {
        }

        public String name() {
            return getClass().getSimpleName();
        }

        @Nullable
        public Map<String, Collection<?>> run(StreamerContext streamerContext, Collection<CheckInEvent> collection) {
            StreamerWindow window = streamerContext.window(name());
            if (!$assertionsDisabled && window == null) {
                throw new AssertionError();
            }
            for (CheckInEvent checkInEvent : collection) {
                Place[] placeArr = StreamingCheckInExample.TRACKED_PLACES;
                int length = placeArr.length;
                int i = 0;
                while (true) {
                    if (i < length) {
                        Place place = placeArr[i];
                        if (distance(checkInEvent.location(), place.location()) <= StreamingCheckInExample.NEARBY_DISTANCE) {
                            window.enqueue(new LocationInfo(checkInEvent.userName(), place));
                            break;
                        }
                        i++;
                    }
                }
            }
            window.pollEvictedAll();
            return null;
        }

        private double distance(Location location, Location location2) {
            double abs = Math.abs(location.x() - location2.x());
            double abs2 = Math.abs(location.y() - location2.y());
            return Math.sqrt((abs * abs) + (abs2 * abs2));
        }

        static {
            $assertionsDisabled = !StreamingCheckInExample.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/examples/streaming/StreamingCheckInExample$Location.class */
    public static class Location {
        private final double x;
        private final double y;

        Location(double d, double d2) {
            this.x = d;
            this.y = d2;
        }

        public double x() {
            return this.x;
        }

        public double y() {
            return this.y;
        }

        public String toString() {
            return "Location [x=" + this.x + ", y=" + this.y + ']';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/examples/streaming/StreamingCheckInExample$LocationInfo.class */
    public static class LocationInfo {
        private final String userName;
        private final Place place;

        LocationInfo(String str, Place place) {
            this.userName = str;
            this.place = place;
        }

        public String userName() {
            return this.userName;
        }

        public Place place() {
            return this.place;
        }

        public String toString() {
            return "LocationInfo [userName=" + this.userName + ", place=" + this.place + ']';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/examples/streaming/StreamingCheckInExample$Place.class */
    public static class Place {
        private final String name;
        private final Location location;

        Place(String str, Location location) {
            this.name = str;
            this.location = location;
        }

        public String name() {
            return this.name;
        }

        public Location location() {
            return this.location;
        }

        public String toString() {
            return "Place [name=" + this.name + ", location=" + this.location + ']';
        }
    }

    /* loaded from: input_file:org/apache/ignite/examples/streaming/StreamingCheckInExample$PlacesIndexUpdater.class */
    private static class PlacesIndexUpdater implements StreamerIndexUpdater<LocationInfo, String, Place> {
        private PlacesIndexUpdater() {
        }

        @Nullable
        public String indexKey(LocationInfo locationInfo) {
            return locationInfo.userName();
        }

        @Nullable
        public Place initialValue(LocationInfo locationInfo, String str) {
            return locationInfo.place();
        }

        @Nullable
        public Place onAdded(StreamerIndexEntry<LocationInfo, String, Place> streamerIndexEntry, LocationInfo locationInfo) {
            throw new AssertionError("onAdded() shouldn't be called on unique index.");
        }

        @Nullable
        public Place onRemoved(StreamerIndexEntry<LocationInfo, String, Place> streamerIndexEntry, LocationInfo locationInfo) {
            return null;
        }

        public /* bridge */ /* synthetic */ Object onRemoved(StreamerIndexEntry streamerIndexEntry, Object obj) {
            return onRemoved((StreamerIndexEntry<LocationInfo, String, Place>) streamerIndexEntry, (LocationInfo) obj);
        }

        public /* bridge */ /* synthetic */ Object onAdded(StreamerIndexEntry streamerIndexEntry, Object obj) throws IgniteException {
            return onAdded((StreamerIndexEntry<LocationInfo, String, Place>) streamerIndexEntry, (LocationInfo) obj);
        }
    }

    public static void main(String[] strArr) throws IgniteException {
        Timer timer = new Timer("check-in-query-worker");
        final Ignite start = Ignition.start("examples/config/example-streamer.xml");
        System.out.println();
        System.out.println(">>> Streaming check-in example started.");
        try {
            IgniteStreamer streamer = start.streamer(STREAMER_NAME);
            if (!$assertionsDisabled && streamer == null) {
                throw new AssertionError();
            }
            streamer.addStreamerFailureListener(new StreamerFailureListener() { // from class: org.apache.ignite.examples.streaming.StreamingCheckInExample.1
                public void onFailure(String str, Collection<Object> collection, Throwable th) {
                    System.err.println("Failure [stage=" + str + ", evts=" + collection + ", err=" + th.getMessage());
                }
            });
            scheduleQuery(streamer, timer);
            streamData(streamer);
            timer.cancel();
            start.compute().broadcast(new IgniteRunnable() { // from class: org.apache.ignite.examples.streaming.StreamingCheckInExample.2
                public void run() {
                    if (!ExamplesUtils.hasStreamer(start, StreamingCheckInExample.STREAMER_NAME)) {
                        System.err.println("Default streamer not found (is example-streamer.xml configuration used on all nodes?)");
                        return;
                    }
                    IgniteStreamer streamer2 = start.streamer(StreamingCheckInExample.STREAMER_NAME);
                    System.out.println("Clearing streamer data.");
                    streamer2.reset();
                }
            });
            Ignition.stop(true);
        } catch (Throwable th) {
            Ignition.stop(true);
            throw th;
        }
    }

    private static void scheduleQuery(final IgniteStreamer igniteStreamer, Timer timer) {
        timer.schedule(new TimerTask() { // from class: org.apache.ignite.examples.streaming.StreamingCheckInExample.3
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    Map map = (Map) igniteStreamer.context().reduce(new IgniteClosure<StreamerContext, Map<String, Place>>() { // from class: org.apache.ignite.examples.streaming.StreamingCheckInExample.3.1
                        static final /* synthetic */ boolean $assertionsDisabled;

                        public Map<String, Place> apply(StreamerContext streamerContext) {
                            StreamerWindow window = streamerContext.window(DetectPlacesStage.class.getSimpleName());
                            if (!$assertionsDisabled && window == null) {
                                throw new AssertionError();
                            }
                            Collection<StreamerIndexEntry> entries = window.index().entries(0);
                            HashMap hashMap = new HashMap(entries.size(), 1.0f);
                            for (StreamerIndexEntry streamerIndexEntry : entries) {
                                hashMap.put(streamerIndexEntry.key(), streamerIndexEntry.value());
                            }
                            return hashMap;
                        }

                        static {
                            $assertionsDisabled = !StreamingCheckInExample.class.desiredAssertionStatus();
                        }
                    }, new IgniteReducer<Map<String, Place>, Map<String, Place>>() { // from class: org.apache.ignite.examples.streaming.StreamingCheckInExample.3.2
                        private Map<String, Place> map;

                        public boolean collect(@Nullable Map<String, Place> map2) {
                            if (map2 == null) {
                                return false;
                            }
                            if (this.map != null) {
                                this.map.putAll(map2);
                                return true;
                            }
                            this.map = map2;
                            return true;
                        }

                        /* renamed from: reduce, reason: merged with bridge method [inline-methods] */
                        public Map<String, Place> m30reduce() {
                            return this.map;
                        }
                    });
                    StringBuilder sb = new StringBuilder("----------------\n");
                    for (Map.Entry entry : map.entrySet()) {
                        sb.append(String.format("%s is at the %s (%s)\n", entry.getKey(), ((Place) entry.getValue()).name(), ((Place) entry.getValue()).location()));
                    }
                    sb.append("----------------\n");
                    System.out.print(sb.toString());
                } catch (IgniteException e) {
                    e.printStackTrace();
                }
            }
        }, 3000L, 3000L);
    }

    private static void streamData(IgniteStreamer igniteStreamer) throws IgniteException {
        for (int i = 0; i < CNT; i++) {
            try {
                CheckInEvent checkInEvent = new CheckInEvent(USER_NAMES[ThreadLocalRandom.current().nextInt(USER_NAMES.length)], new Location(RAND.nextDouble() + RAND.nextInt(29), RAND.nextDouble() + RAND.nextInt(30)));
                System.out.println(">>> Generating event: " + checkInEvent);
                igniteStreamer.addEvent(checkInEvent, new Object[0]);
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    static {
        $assertionsDisabled = !StreamingCheckInExample.class.desiredAssertionStatus();
        RAND = new Random();
        USER_NAMES = new String[]{"Alice", "Bob", "Ann", "Joe", "Mary", "Peter", "Lisa", "Tom", "Kate", "Sam"};
        TRACKED_PLACES = new Place[]{new Place("Theatre", new Location(1.234d, 2.567d)), new Place("Bowling", new Location(10.111d, 5.213d)), new Place("Bar", new Location(15.199d, 16.781d)), new Place("Cinema", new Location(3.77d, 20.239d))};
    }
}
