package com.twitter.finatra.kafkastreams.prerestore;

import com.twitter.app.App;
import com.twitter.app.Flag;
import com.twitter.app.Flaggable$;
import com.twitter.conversions.DurationOps$;
import com.twitter.conversions.DurationOps$RichDuration$;
import com.twitter.finatra.annotations.Experimental;
import com.twitter.finatra.kafkastreams.internal.utils.ReflectionUtils$;
import com.twitter.finatra.kafkastreams.partitioning.StaticPartitioning;
import com.twitter.finatra.kafkastreams.partitioning.StaticPartitioning$;
import com.twitter.util.Duration;
import com.twitter.util.TimeLike;
import com.twitter.util.logging.Logging;
import java.lang.reflect.Field;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.joda.time.DateTimeUtils;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.mutable.ArrayOps;
import scala.math.Numeric$DoubleIsFractional$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: PreRestoreState.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015faB\n\u0015!\u0003\r\ta\b\u0005\u0006U\u0001!\ta\u000b\u0005\be\u0001\u0011\r\u0011\"\u00034\u0011\u001di\u0004A1A\u0005\nyBaA\u0012\u0001\u0005VaY\u0003\"B$\u0001\t\u0013A\u0005\"B,\u0001\t\u0013A\u0006\"\u0002.\u0001\t\u0013Y\u0006\"\u00021\u0001\t\u0013\t\u0007\"B6\u0001\t\u0013Y\u0003\"\u00027\u0001\t\u0013i\u0007bBA\u0003\u0001\u0011%\u0011q\u0001\u0005\b\u0003w\u0001A\u0011BA\u001f\u0011\u001d\t\u0019\u0006\u0001C\u0005\u0003+Ba\"a\u0017\u0001!\u0003\r\t\u0011!C\u0005\u0003;\ny\u0006\u0003\b\u0002b\u0001\u0001\n1!A\u0001\n\u0013\t\u0019'a\u001b\t\u001d\u0005=\u0004\u0001%A\u0002\u0002\u0003%I!!\u001d\u0002v!i\u0011\u0011\u0010\u0001\u0011\u0002\u0007\u0005\t\u0011\"\u0003,\u0003wBa\"! \u0001!\u0003\r\t\u0011!C\u0005\u0003\u007f\n\u0019JA\bQe\u0016\u0014Vm\u001d;pe\u0016\u001cF/\u0019;f\u0015\t)b#\u0001\u0006qe\u0016\u0014Xm\u001d;pe\u0016T!a\u0006\r\u0002\u0019-\fgm[1tiJ,\u0017-\\:\u000b\u0005eQ\u0012a\u00024j]\u0006$(/\u0019\u0006\u00037q\tq\u0001^<jiR,'OC\u0001\u001e\u0003\r\u0019w.\\\u0002\u0001'\r\u0001\u0001\u0005\n\t\u0003C\tj\u0011AF\u0005\u0003GY\u0011\u0011dS1gW\u0006\u001cFO]3b[N$v/\u001b;uKJ\u001cVM\u001d<feB\u0011Q\u0005K\u0007\u0002M)\u0011qEF\u0001\ra\u0006\u0014H/\u001b;j_:LgnZ\u0005\u0003S\u0019\u0012!c\u0015;bi&\u001c\u0007+\u0019:uSRLwN\\5oO\u00061A%\u001b8ji\u0012\"\u0012\u0001\f\t\u0003[Aj\u0011A\f\u0006\u0002_\u0005)1oY1mC&\u0011\u0011G\f\u0002\u0005+:LG/A\bqe\u0016\u0014Vm\u001d;pe\u0016\u001cF/\u0019;f+\u0005!\u0004cA\u001b9u5\taG\u0003\u000285\u0005\u0019\u0011\r\u001d9\n\u0005e2$\u0001\u0002$mC\u001e\u0004\"!L\u001e\n\u0005qr#a\u0002\"p_2,\u0017M\\\u0001\u001faJ,'+Z:u_J,G)\u001e:bi&|g.\u00138ji&\fG\u000eR3mCf,\u0012a\u0010\t\u0004ka\u0002\u0005CA!E\u001b\u0005\u0011%BA\"\u001b\u0003\u0011)H/\u001b7\n\u0005\u0015\u0013%\u0001\u0003#ve\u0006$\u0018n\u001c8\u00025\r\u0014X-\u0019;f\u0003:$7\u000b^1si.\u000bgm[1TiJ,\u0017-\\:\u0002;M$\u0018M\u001d;XC&$hi\u001c:Qe\u0016\u0014Vm\u001d;pe\u0016$G\u000b\u001b:fC\u0012$\"\u0001L%\t\u000b)+\u0001\u0019A&\u0002-A\u0014XMU3ti>\u0014XmS1gW\u0006\u001cFO]3b[N\u0004\"\u0001T+\u000e\u00035S!AT(\u0002\u000fM$(/Z1ng*\u0011\u0001+U\u0001\u0006W\u000647.\u0019\u0006\u0003%N\u000ba!\u00199bG\",'\"\u0001+\u0002\u0007=\u0014x-\u0003\u0002W\u001b\na1*\u00194lCN#(/Z1ng\u0006Ir/Y5u\r>\u0014\bK]3SKN$xN]3GS:L7\u000f[3e)\ta\u0013\fC\u0003K\r\u0001\u00071*A\ngS:$Gk\u001c;bYJ+7\u000f^8sK2\u000bw\r\u0006\u0002]?B\u0011Q&X\u0005\u0003=:\u0012a\u0001R8vE2,\u0007\"\u0002&\b\u0001\u0004Y\u0015!H2p]\u001aLw-\u001e:f!J,'+Z:u_J,\u0007K]8qKJ$\u0018.Z:\u0015\u0005\tL\u0007CA2h\u001b\u0005!'BA\"f\u0015\u00051\u0017\u0001\u00026bm\u0006L!\u0001\u001b3\u0003\u0015A\u0013x\u000e]3si&,7\u000fC\u0003k\u0011\u0001\u0007!-\u0001\u0006qe>\u0004XM\u001d;jKN\f1C]3tKR\u001cFO]3b[RC'/Z1e\u0013\u0012\fQDZ5oIJ+7\u000f^8sK\u000e{gn];nKJd\u0015mZ'fiJL7m\u001d\u000b\u0004]\u0006\u0005\u0001cA8xu:\u0011\u0001/\u001e\b\u0003cRl\u0011A\u001d\u0006\u0003gz\ta\u0001\u0010:p_Rt\u0014\"A\u0018\n\u0005Yt\u0013a\u00029bG.\fw-Z\u0005\u0003qf\u00141aU3r\u0015\t1h\u0006\u0005\u0002|}6\tAP\u0003\u0002~\u001f\u000611m\\7n_:L!a ?\u0003\r5+GO]5d\u0011\u0019\t\u0019A\u0003a\u0001\u0017\u0006a1.\u00194lCN#(/Z1ng\u0006)b-\u001b8e\u0007>t7/^7fe2\u000bw-T3ue&\u001cG\u0003BA\u0005\u00037\u0001R!LA\u0006\u0003\u001fI1!!\u0004/\u0005\u0019y\u0005\u000f^5p]B1Q&!\u0005\u0002\u0016iL1!a\u0005/\u0005\u0019!V\u000f\u001d7feA\u001910a\u0006\n\u0007\u0005eAP\u0001\u0006NKR\u0014\u0018n\u0019(b[\u0016Dq!!\b\f\u0001\u0004\ty\"A\bsKN$xN]3D_:\u001cX/\\3s!!\t\t#a\u000b\u00020\u0005=RBAA\u0012\u0015\u0011\t)#a\n\u0002\u0011\r|gn];nKJT1!!\u000bP\u0003\u001d\u0019G.[3oiNLA!!\f\u0002$\tA1i\u001c8tk6,'\u000fE\u0003.\u0003c\t)$C\u0002\u000249\u0012Q!\u0011:sCf\u00042!LA\u001c\u0013\r\tID\f\u0002\u0005\u0005f$X-\u0001\u0006hKR$\u0006N]3bIN$B!a\u0010\u0002RA)Q&!\r\u0002BA!\u00111IA'\u001b\t\t)E\u0003\u0003\u0002H\u0005%\u0013!C5oi\u0016\u0014h.\u00197t\u0015\r\tY%T\u0001\naJ|7-Z:t_JLA!a\u0014\u0002F\ta1\u000b\u001e:fC6$\u0006N]3bI\"1\u00111\u0001\u0007A\u0002-\u000b!cZ3u%\u0016\u001cHo\u001c:f\u0007>t7/^7feR!\u0011qDA,\u0011\u001d\tI&\u0004a\u0001\u0003\u0003\na\u0001\u001e5sK\u0006$\u0017\u0001E:va\u0016\u0014H\u0005\u001d:pa\u0016\u0014H/[3t+\u0005\u0011\u0017B\u00016#\u00039\u0019X\u000f]3sIQ|\u0007o\u001c7pOf,\"!!\u001a\u0011\u00071\u000b9'C\u0002\u0002j5\u0013\u0001\u0002V8q_2|w-_\u0005\u0004\u0003[\u0012\u0013\u0001\u0003;pa>dwnZ=\u00023M,\b/\u001a:%g\u0016$X\t_2faRLwN\u001c%b]\u0012dWM\u001d\u000b\u0004Y\u0005M\u0004\"\u0002(\u0011\u0001\u0004Y\u0015bAA<E\u0005\u00192/\u001a;Fq\u000e,\u0007\u000f^5p]\"\u000bg\u000e\u001a7fe\u0006\u00013/\u001e9fe\u0012\u001a'/Z1uK\u0006sGm\u0015;beR\\\u0015MZ6b'R\u0014X-Y7t\u0013\t1%%A\u000ftkB,'\u000fJ1qa2L7-\u0019;j_:\u001cVM\u001d<fe\u000e{gNZ5h+\t\t\t\t\u0005\u00036q\u0005\r\u0005\u0003BAC\u0003\u001bsA!a\"\u0002\nB\u0011\u0011OL\u0005\u0004\u0003\u0017s\u0013A\u0002)sK\u0012,g-\u0003\u0003\u0002\u0010\u0006E%AB*ue&twMC\u0002\u0002\f:J1!!&#\u0003]\t\u0007\u000f\u001d7jG\u0006$\u0018n\u001c8TKJ4XM]\"p]\u001aLw\rK\u0002\u0001\u00033\u0003B!a'\u0002\"6\u0011\u0011Q\u0014\u0006\u0004\u0003?C\u0012aC1o]>$\u0018\r^5p]NLA!a)\u0002\u001e\naQ\t\u001f9fe&lWM\u001c;bY\u0002")
@Experimental
/* loaded from: input_file:com/twitter/finatra/kafkastreams/prerestore/PreRestoreState.class */
public interface PreRestoreState extends StaticPartitioning {
    void com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$_setter_$com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreState_$eq(Flag<Object> flag);

    void com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$_setter_$com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreDurationInitialDelay_$eq(Flag<Duration> flag);

    /* synthetic */ Properties com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$properties();

    /* synthetic */ Topology com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$topology();

    /* synthetic */ void com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$setExceptionHandler(KafkaStreams kafkaStreams);

    /* synthetic */ void com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$createAndStartKafkaStreams();

    /* synthetic */ Flag com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$applicationServerConfig();

    Flag<Object> com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreState();

    Flag<Duration> com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreDurationInitialDelay();

    default void createAndStartKafkaStreams() {
        if (!BoxesRunTime.unboxToBoolean(com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreState().apply())) {
            com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$createAndStartKafkaStreams();
            return;
        }
        KafkaStreams kafkaStreams = new KafkaStreams(com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$topology(), configurePreRestoreProperties((Properties) com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$properties().clone()), kafkaStreamsClientSupplier());
        com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$setExceptionHandler(kafkaStreams);
        kafkaStreams.start();
        startWaitForPreRestoredThread(kafkaStreams);
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [com.twitter.finatra.kafkastreams.prerestore.PreRestoreState$$anon$1] */
    private default void startWaitForPreRestoredThread(final KafkaStreams kafkaStreams) {
        new Thread(this, kafkaStreams) { // from class: com.twitter.finatra.kafkastreams.prerestore.PreRestoreState$$anon$1
            private final /* synthetic */ PreRestoreState $outer;
            private final KafkaStreams preRestoreKafkaStreams$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    this.$outer.com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$waitForPreRestoreFinished(this.preRestoreKafkaStreams$1);
                    this.$outer.info(() -> {
                        return "Closing pre-restoring server";
                    });
                    this.preRestoreKafkaStreams$1.close(1L, TimeUnit.MINUTES);
                    this.$outer.info(() -> {
                        return "Pre-restore complete.";
                    });
                    this.$outer.com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$resetStreamThreadId();
                    this.$outer.com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$createAndStartKafkaStreams();
                } catch (Throwable th) {
                    Option unapply = NonFatal$.MODULE$.unapply(th);
                    if (unapply.isEmpty()) {
                        throw th;
                    }
                    this.$outer.error(() -> {
                        return "PreRestore error";
                    }, (Throwable) unapply.get());
                    this.$outer.close(this.$outer.defaultCloseGracePeriod());
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                }
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super("wait-for-pre-restoring-server-thread");
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.preRestoreKafkaStreams$1 = kafkaStreams;
            }
        }.start();
    }

    default void com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$waitForPreRestoreFinished(KafkaStreams kafkaStreams) {
        ((Logging) this).info(() -> {
            return new StringBuilder(83).append("Waiting for Total Restore Lag to be less than 1000 after an initial wait period of ").append(this.com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreDurationInitialDelay().apply()).toString();
        });
        long currentTimeMillis = DateTimeUtils.currentTimeMillis() + ((TimeLike) com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreDurationInitialDelay().apply()).inMillis();
        double d = Double.MAX_VALUE;
        while (true) {
            if (d < 10000 && DateTimeUtils.currentTimeMillis() >= currentTimeMillis) {
                return;
            }
            d = findTotalRestoreLag(kafkaStreams);
            Thread.sleep(1000L);
        }
    }

    private default double findTotalRestoreLag(KafkaStreams kafkaStreams) {
        double unboxToDouble = BoxesRunTime.unboxToDouble(((TraversableOnce) findRestoreConsumerLagMetrics(kafkaStreams).map(metric -> {
            return BoxesRunTime.boxToDouble($anonfun$findTotalRestoreLag$1(metric));
        }, Seq$.MODULE$.canBuildFrom())).sum(Numeric$DoubleIsFractional$.MODULE$));
        ((Logging) this).info(() -> {
            return new StringBuilder(19).append("Total Restore Lag: ").append(unboxToDouble).toString();
        });
        return unboxToDouble;
    }

    private default Properties configurePreRestoreProperties(Properties properties) {
        properties.put("application.server", new StringBuilder(1).append(Utils.getHost((String) com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$applicationServerConfig().apply())).append(":").append(StaticPartitioning$.MODULE$.PreRestoreSignalingPort()).toString());
        properties.put("poll.ms", "0");
        return properties;
    }

    default void com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$resetStreamThreadId() {
        try {
            Field declaredField = StreamThread.class.getDeclaredField("STREAM_THREAD_ID_SEQUENCE");
            declaredField.setAccessible(true);
            ((AtomicInteger) declaredField.get(null)).set(1);
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            ((Logging) this).error(() -> {
                return "Error resetting stream threads";
            }, (Throwable) unapply.get());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    private default Seq<Metric> findRestoreConsumerLagMetrics(KafkaStreams kafkaStreams) {
        return (Seq) ((TraversableLike) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(getThreads(kafkaStreams))).toSeq().map(streamThread -> {
            return new Tuple2(streamThread, this.getRestoreConsumer(streamThread));
        }, Seq$.MODULE$.canBuildFrom())).flatMap(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return Option$.MODULE$.option2Iterable(this.findConsumerLagMetric((Consumer) tuple2._2()).withFilter(tuple2 -> {
                return BoxesRunTime.boxToBoolean($anonfun$findRestoreConsumerLagMetrics$3(tuple2));
            }).map(tuple22 -> {
                if (tuple22 != null) {
                    return (Metric) tuple22._2();
                }
                throw new MatchError(tuple22);
            }));
        }, Seq$.MODULE$.canBuildFrom());
    }

    private default Option<Tuple2<MetricName, Metric>> findConsumerLagMetric(Consumer<byte[], byte[]> consumer) {
        return ((IterableLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(consumer.metrics()).asScala()).find(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$findConsumerLagMetric$1(tuple2));
        });
    }

    private default StreamThread[] getThreads(KafkaStreams kafkaStreams) {
        return (StreamThread[]) ReflectionUtils$.MODULE$.getFinalField(kafkaStreams, "threads");
    }

    private default Consumer<byte[], byte[]> getRestoreConsumer(StreamThread streamThread) {
        return (Consumer) ReflectionUtils$.MODULE$.getFinalField(streamThread, "restoreConsumer");
    }

    static /* synthetic */ double $anonfun$findTotalRestoreLag$1(Metric metric) {
        return BoxesRunTime.unboxToDouble(metric.metricValue());
    }

    static /* synthetic */ boolean $anonfun$findRestoreConsumerLagMetrics$3(Tuple2 tuple2) {
        return tuple2 != null;
    }

    static /* synthetic */ boolean $anonfun$findConsumerLagMetric$1(Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String name = ((MetricName) tuple2._1()).name();
        return name != null ? name.equals("records-lag") : "records-lag" == 0;
    }

    static void $init$(PreRestoreState preRestoreState) {
        preRestoreState.com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$_setter_$com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreState_$eq(((App) preRestoreState).flag().apply("kafka.prerestore.state", () -> {
            return true;
        }, "Pre-Restore state", Flaggable$.MODULE$.ofBoolean()));
        preRestoreState.com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$_setter_$com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreDurationInitialDelay_$eq(((App) preRestoreState).flag().apply("kafka.prerestore.duration", () -> {
            return DurationOps$RichDuration$.MODULE$.minutes$extension(DurationOps$.MODULE$.RichDuration(2L));
        }, "Pre-Restore min delay", Flaggable$.MODULE$.ofDuration()));
    }
}
