add garbage collection for stale routes

This commit is contained in:
lemmi 2021-12-16 08:53:39 +01:00
parent 193dc695d1
commit 3d1a843b64
1 changed files with 144 additions and 23 deletions

167
main.go
View File

@ -3,6 +3,7 @@ package main
import (
"bytes"
"compress/gzip"
"container/heap"
"fmt"
"html/template"
"io"
@ -30,11 +31,14 @@ var (
type route struct {
Dst netaddr.IPPrefix
Counter uint
UnreachableDuration time.Duration
UnreachableSince time.Time
ReachableSince time.Time
Counter uint
Hostname string
LastUpdate time.Time
gqIdx int
}
func (r route) DurationUntilRounded(t time.Time) time.Duration {
@ -59,20 +63,59 @@ func (r route) Noise(now time.Time) bool {
r.Downtime(now) < 0.5
}
func (r route) LastUpdateUntil(t time.Time) time.Duration {
return t.Sub(r.LastUpdate).Round(time.Second)
}
func (r route) String() string {
now := time.Now()
return fmt.Sprintf("%44s %5d %2.3f%% %s", r.Dst, r.Counter, r.Downtime(now), r.DurationUntil(now))
}
type gcQueue []*route
var _ heap.Interface = &gcQueue{}
func (gcq gcQueue) Len() int {
return len(gcq)
}
func (gcq gcQueue) Less(i, j int) bool {
return gcq[i].LastUpdate.Before(gcq[j].LastUpdate)
}
func (gcq gcQueue) Swap(i, j int) {
gcq[i], gcq[j] = gcq[j], gcq[i]
gcq[i].gqIdx = i
gcq[j].gqIdx = j
}
func (gcq *gcQueue) Push(x interface{}) {
r := x.(*route)
r.gqIdx = len(*gcq)
*gcq = append(*gcq, r)
}
func (gcq *gcQueue) Pop() interface{} {
last := len(*gcq) - 1
ret := (*gcq)[last]
ret.gqIdx = -1
(*gcq)[last] = nil
*gcq = (*gcq)[:last]
return ret
}
type routeStats struct {
sync.Mutex
stats map[netaddr.IPPrefix]*route
stats map[netaddr.IPPrefix]*route
gcstats map[netaddr.IPPrefix]*route
gq gcQueue
}
func newRouteStats() *routeStats {
return &routeStats{
stats: make(map[netaddr.IPPrefix]*route),
rs := &routeStats{
stats: make(map[netaddr.IPPrefix]*route),
gcstats: make(map[netaddr.IPPrefix]*route),
gq: gcQueue{},
}
heap.Init(&rs.gq)
return rs
}
func (rs *routeStats) add(prefix netaddr.IPPrefix) {
@ -81,27 +124,43 @@ func (rs *routeStats) add(prefix netaddr.IPPrefix) {
r := rs.stats[prefix]
if r == nil {
r = &route{Dst: prefix}
go func(r *route, rs *routeStats) {
names, err := net.LookupAddr(r.Dst.IP().String())
if err != nil || len(names) == 0 {
return
}
if strings.HasSuffix(names[0], ".rdns.f3netze.de.") {
return
}
rs.Lock()
r.Hostname = strings.TrimRight(names[0], ".")
rs.Unlock()
}(r, rs)
r = rs.gcstats[prefix]
if r == nil {
r = &route{Dst: prefix}
go func(r *route, rs *routeStats) {
names, err := net.LookupAddr(r.Dst.IP().String())
if err != nil || len(names) == 0 {
return
}
if strings.HasSuffix(names[0], ".rdns.f3netze.de.") {
return
}
rs.Lock()
r.Hostname = strings.TrimRight(names[0], ".")
rs.Unlock()
}(r, rs)
} else {
delete(rs.gcstats, r.Dst)
r.LastUpdate = time.Time{}
}
}
now := time.Now()
if r.UnreachableSince.IsZero() {
r.UnreachableSince = time.Now()
r.UnreachableSince = now
r.ReachableSince = time.Time{}
r.Counter++
}
oldLastUpdate := r.LastUpdate
r.LastUpdate = now
if oldLastUpdate.IsZero() {
heap.Push(&rs.gq, r)
} else {
heap.Fix(&rs.gq, r.gqIdx)
}
rs.stats[prefix] = r
}
@ -114,15 +173,53 @@ func (rs *routeStats) del(prefix netaddr.IPPrefix) {
return
}
now := time.Now()
if !r.UnreachableSince.IsZero() {
r.UnreachableDuration += time.Since(r.UnreachableSince)
r.UnreachableSince = time.Time{}
r.ReachableSince = time.Now()
r.ReachableSince = now
}
r.LastUpdate = now
heap.Fix(&rs.gq, r.gqIdx)
rs.stats[prefix] = r
}
func (rs *routeStats) runGC() {
rs.Lock()
defer rs.Unlock()
now := time.Now()
for len(rs.gq) > 0 {
first := rs.gq[0]
if first.LastUpdate.Add(3 * time.Minute).Before(now) {
heap.Pop(&rs.gq)
delete(rs.stats, first.Dst)
rs.gcstats[first.Dst] = first
} else {
break
}
}
}
func (rs *routeStats) getCollected() []route {
ret := make([]route, 0, len(rs.gcstats))
rs.Lock()
defer rs.Unlock()
for _, v := range rs.gcstats {
ret = append(ret, *v)
}
sort.Slice(ret, func(i, j int) bool {
return ret[i].LastUpdate.Before(ret[j].LastUpdate)
})
return ret
}
func (rs *routeStats) getAll() []route {
ret := make([]route, 0, len(rs.stats))
@ -182,13 +279,14 @@ func monitor(done <-chan struct{}, rs *routeStats) error {
} else {
rs.del(t.Prefix)
}
}
}
rs.runGC()
}
}
func render(w io.Writer, rs []route) error {
func render(w io.Writer, rs []route, gc []route) error {
const tmplHTML = `<!doctype html>
<html lang="en">
<head>
@ -251,6 +349,7 @@ func render(w io.Writer, rs []route) error {
<th>Net</th>
<th>Hostname</th>
<th colspan="3">Downtime</th>
<th>Last</th>
</tr>
<tr>
<th></th>
@ -258,10 +357,11 @@ func render(w io.Writer, rs []route) error {
<th>Counts</th>
<th>rel</th>
<th>abs</th>
<th>Update</th>
</tr>
</thead>
<tbody>
<tr><td colspan="5"><hr/></td></tr>
<tr><td colspan="6"><hr/></td></tr>
{{- $now := .Now -}}
{{- range .R }}
<tr class="
@ -273,9 +373,28 @@ func render(w io.Writer, rs []route) error {
<td class="counter">{{.Counter}}</td>
<td class="downtime">{{.Downtime $now | printf "%.3f%%"}}</td>
<td class="duration">{{.DurationUntilRounded $now}}</td>
<td class="duration">{{.LastUpdateUntil $now}}</td>
</tr>
{{- end}}
</tbody>
{{with .GC}}
<tbody>
<tr><td colspan="6">&nbsp;</td></tr>
</tbody>
<thead>
<tr><th colspan="6">Garbage Collected</th></tr>
</thead>
<tbody>
<tr><td colspan="6"><hr/></td></tr>
{{- range . }}
<tr>
<td class="dst">{{.Dst.String}}</td>
<td colspan="3"></td>
<td colspan="2">{{.LastUpdate.Format "2006-01-02 15:04:05"}}</td>
</tr>
{{- end}}
</tbody>
{{- end}}
</table>
</body>
</html>
@ -285,10 +404,12 @@ func render(w io.Writer, rs []route) error {
Now time.Time
Start time.Time
R []route
GC []route
}{
time.Now(),
startTime,
rs,
gc,
}
t := template.Must(template.New("main").Parse(tmplHTML))
return t.Execute(w, data)
@ -347,7 +468,7 @@ func cachedRender(rs *routeStats) http.HandlerFunc {
content.Reset()
compressed.Reset()
err = render(&content, rs.getLongest())
err = render(&content, rs.getLongest(), rs.getCollected())
err = compress(&compressed, bytes.NewReader(content.Bytes()), err)
timestamp = time.Now()