From 3d1a843b64e7de8fb3cfc35835f5b303a1487aee Mon Sep 17 00:00:00 2001 From: lemmi Date: Thu, 16 Dec 2021 08:53:39 +0100 Subject: [PATCH] add garbage collection for stale routes --- main.go | 167 ++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 144 insertions(+), 23 deletions(-) diff --git a/main.go b/main.go index c663379..35bfc30 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "bytes" "compress/gzip" + "container/heap" "fmt" "html/template" "io" @@ -30,11 +31,14 @@ var ( type route struct { Dst netaddr.IPPrefix - Counter uint UnreachableDuration time.Duration UnreachableSince time.Time ReachableSince time.Time + Counter uint Hostname string + + LastUpdate time.Time + gqIdx int } func (r route) DurationUntilRounded(t time.Time) time.Duration { @@ -59,20 +63,59 @@ func (r route) Noise(now time.Time) bool { r.Downtime(now) < 0.5 } +func (r route) LastUpdateUntil(t time.Time) time.Duration { + return t.Sub(r.LastUpdate).Round(time.Second) +} + func (r route) String() string { now := time.Now() return fmt.Sprintf("%44s %5d %2.3f%% %s", r.Dst, r.Counter, r.Downtime(now), r.DurationUntil(now)) } +type gcQueue []*route + +var _ heap.Interface = &gcQueue{} + +func (gcq gcQueue) Len() int { + return len(gcq) +} +func (gcq gcQueue) Less(i, j int) bool { + return gcq[i].LastUpdate.Before(gcq[j].LastUpdate) +} +func (gcq gcQueue) Swap(i, j int) { + gcq[i], gcq[j] = gcq[j], gcq[i] + gcq[i].gqIdx = i + gcq[j].gqIdx = j +} +func (gcq *gcQueue) Push(x interface{}) { + r := x.(*route) + r.gqIdx = len(*gcq) + *gcq = append(*gcq, r) +} +func (gcq *gcQueue) Pop() interface{} { + last := len(*gcq) - 1 + ret := (*gcq)[last] + ret.gqIdx = -1 + (*gcq)[last] = nil + *gcq = (*gcq)[:last] + return ret +} + type routeStats struct { sync.Mutex - stats map[netaddr.IPPrefix]*route + stats map[netaddr.IPPrefix]*route + gcstats map[netaddr.IPPrefix]*route + gq gcQueue } func newRouteStats() *routeStats { - return &routeStats{ - stats: make(map[netaddr.IPPrefix]*route), + rs := &routeStats{ + stats: make(map[netaddr.IPPrefix]*route), + gcstats: make(map[netaddr.IPPrefix]*route), + gq: gcQueue{}, } + heap.Init(&rs.gq) + return rs } func (rs *routeStats) add(prefix netaddr.IPPrefix) { @@ -81,27 +124,43 @@ func (rs *routeStats) add(prefix netaddr.IPPrefix) { r := rs.stats[prefix] if r == nil { - r = &route{Dst: prefix} - go func(r *route, rs *routeStats) { - names, err := net.LookupAddr(r.Dst.IP().String()) - if err != nil || len(names) == 0 { - return - } - if strings.HasSuffix(names[0], ".rdns.f3netze.de.") { - return - } - rs.Lock() - r.Hostname = strings.TrimRight(names[0], ".") - rs.Unlock() - }(r, rs) + r = rs.gcstats[prefix] + if r == nil { + r = &route{Dst: prefix} + go func(r *route, rs *routeStats) { + names, err := net.LookupAddr(r.Dst.IP().String()) + if err != nil || len(names) == 0 { + return + } + if strings.HasSuffix(names[0], ".rdns.f3netze.de.") { + return + } + rs.Lock() + r.Hostname = strings.TrimRight(names[0], ".") + rs.Unlock() + }(r, rs) + } else { + delete(rs.gcstats, r.Dst) + r.LastUpdate = time.Time{} + } } + now := time.Now() if r.UnreachableSince.IsZero() { - r.UnreachableSince = time.Now() + r.UnreachableSince = now r.ReachableSince = time.Time{} r.Counter++ } + oldLastUpdate := r.LastUpdate + r.LastUpdate = now + + if oldLastUpdate.IsZero() { + heap.Push(&rs.gq, r) + } else { + heap.Fix(&rs.gq, r.gqIdx) + } + rs.stats[prefix] = r } @@ -114,15 +173,53 @@ func (rs *routeStats) del(prefix netaddr.IPPrefix) { return } + now := time.Now() + if !r.UnreachableSince.IsZero() { r.UnreachableDuration += time.Since(r.UnreachableSince) r.UnreachableSince = time.Time{} - r.ReachableSince = time.Now() + r.ReachableSince = now } + r.LastUpdate = now + heap.Fix(&rs.gq, r.gqIdx) + rs.stats[prefix] = r } +func (rs *routeStats) runGC() { + rs.Lock() + defer rs.Unlock() + + now := time.Now() + for len(rs.gq) > 0 { + first := rs.gq[0] + if first.LastUpdate.Add(3 * time.Minute).Before(now) { + heap.Pop(&rs.gq) + delete(rs.stats, first.Dst) + rs.gcstats[first.Dst] = first + } else { + break + } + } +} + +func (rs *routeStats) getCollected() []route { + ret := make([]route, 0, len(rs.gcstats)) + + rs.Lock() + defer rs.Unlock() + + for _, v := range rs.gcstats { + ret = append(ret, *v) + } + sort.Slice(ret, func(i, j int) bool { + return ret[i].LastUpdate.Before(ret[j].LastUpdate) + }) + + return ret +} + func (rs *routeStats) getAll() []route { ret := make([]route, 0, len(rs.stats)) @@ -182,13 +279,14 @@ func monitor(done <-chan struct{}, rs *routeStats) error { } else { rs.del(t.Prefix) } - } } + + rs.runGC() } } -func render(w io.Writer, rs []route) error { +func render(w io.Writer, rs []route, gc []route) error { const tmplHTML = ` @@ -251,6 +349,7 @@ func render(w io.Writer, rs []route) error { Net Hostname Downtime + Last @@ -258,10 +357,11 @@ func render(w io.Writer, rs []route) error { Counts rel abs + Update -
+
{{- $now := .Now -}} {{- range .R }} {{.Counter}} {{.Downtime $now | printf "%.3f%%"}} {{.DurationUntilRounded $now}} + {{.LastUpdateUntil $now}} {{- end}} + {{with .GC}} + +   + + + Garbage Collected + + +
+ {{- range . }} + + {{.Dst.String}} + + {{.LastUpdate.Format "2006-01-02 15:04:05"}} + + {{- end}} + + {{- end}} @@ -285,10 +404,12 @@ func render(w io.Writer, rs []route) error { Now time.Time Start time.Time R []route + GC []route }{ time.Now(), startTime, rs, + gc, } t := template.Must(template.New("main").Parse(tmplHTML)) return t.Execute(w, data) @@ -347,7 +468,7 @@ func cachedRender(rs *routeStats) http.HandlerFunc { content.Reset() compressed.Reset() - err = render(&content, rs.getLongest()) + err = render(&content, rs.getLongest(), rs.getCollected()) err = compress(&compressed, bytes.NewReader(content.Bytes()), err) timestamp = time.Now()