
227 lines
5.8 KiB
Raw Normal View History

2015-10-07 16:19:07 +02:00
from flask import Blueprint, session
2015-10-07 16:19:07 +02:00
from dateutil import tz
2015-11-03 16:03:14 +01:00
from bson.json_util import dumps as bson2json
import os
import sys
2015-11-05 15:47:40 +01:00
import json
import datetime
import re
import hashlib
from ffmap.misc import int2mac, int2shortmac, inttoipv4, bintoipv6
from ipaddress import ip_address
2015-10-07 16:19:07 +02:00
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__) + '/' + '../..'))
from ffmap.misc import *
2015-10-07 16:19:07 +02:00
filters = Blueprint("filters", __name__)
def sumdict(d):
return sum(d.values())
def longip(d):
if len(d) > 32:
return d.replace('::','::... ...::')
return d
def int2macfilter(d):
return int2mac(d)
def int2shortmacfilter(d):
return int2shortmac(d)
def int2ipv4filter(d):
return inttoipv4(d)
def bin2ipv6filter(d):
return bintoipv6(d)
def ip2intfilter(d):
return int(ip_address(d))
def ipnet2intfilter(d):
return int(ip_address(d.split("/")[0]))
2015-10-07 16:19:07 +02:00
def utc2local(dt):
return dt.astimezone(tz.tzlocal())
2015-10-12 18:09:39 +02:00
def format_dt(dt):
return dt.strftime("%Y-%m-%d %H:%M:%S")
2015-11-18 14:48:14 +01:00
def format_dt_date(dt):
return dt.strftime("%Y-%m-%d")
2015-11-03 16:03:14 +01:00
def dt2jstimestamp(dt):
return int(dt.timestamp())*1000
def format_dt_ago(dt):
diff = utcnow() - dt
s = diff.seconds
if diff.days > 1:
return '%i days ago' % diff.days
elif diff.days == 1:
return '1 day ago'
elif s <= 1:
return 'just now'
elif s < 60:
return '%i seconds ago' % s
elif s < 120:
return '1 minute ago'
elif s < 3600:
return '%i minutes ago' % (s/60)
elif s < 7200:
return '1 hour ago'
return '%i hours ago' % (s/3600)
2015-11-15 20:47:43 +01:00
def format_dt_diff(ts):
diff = datetime.timedelta(seconds=ts)
s = diff.seconds
if diff.days > 1:
return '%i days' % diff.days
elif diff.days == 1:
return '1 day'
elif s <= 1:
return '< 1 sec'
elif s < 60:
return '%i sec' % s
elif s < 120:
return '1 min'
elif s < 3600:
return '%i min' % (s/60)
elif s < 7200:
return '1 hour'
return '%i hours' % (s/3600)
2015-11-03 16:03:14 +01:00
def bson_to_json(bsn):
return bson2json(bsn)
2015-11-05 15:47:40 +01:00
def statbson_to_json(bsn):
for point in bsn:
point["time"] = {"$date": int(point["time"].timestamp()*1000)}
return json.dumps(bsn)
def nbsp(txt):
return txt.replace(" ", "&nbsp;")
2015-10-12 18:09:39 +02:00
def humanize_bytes(num, suffix='B'):
for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
2015-10-20 21:45:26 +02:00
if abs(num) < 1024.0 and unit != '':
return "%3.1f %s%s" % (num, unit, suffix)
2015-10-12 18:09:39 +02:00
num /= 1024.0
return "%.1f %s%s" % (num, 'Yi', suffix)
def bytes_to_bits(num, suffix='b'):
num *= 8.0
for unit in ['','k','M','G','T','P','E','Z']:
if abs(num) < 1000.0 and unit != '':
return "%3.1f %s%s" % (num, unit, suffix)
num /= 1000.0
return "%.1f %s%s" % (num, 'Y', suffix)
def mac_to_ipv6_linklocal(mac):
if not mac:
return ''
# Remove the most common delimiters; dots, dashes, etc.
mac_bare = re.sub('[%s]+' % re.escape(' .:-'), '', mac)
return macint_to_ipv6_linklocal(int(mac_bare, 16))
def macint_to_ipv6_linklocal(mac_value):
if not mac_value:
return ''
# Split out the bytes that slot into the IPv6 address
# XOR the most significant byte with 0x02, inverting the
# Universal / Local bit
high2 = mac_value >> 32 & 0xffff ^ 0x0200
high1 = mac_value >> 24 & 0xff
low1 = mac_value >> 16 & 0xff
low2 = mac_value & 0xffff
return 'fe80::{:x}:{:x}ff:fe{:x}:{:x}'.format(high2, high1, low1, low2)
def status2css(status):
status_map = {
"offline": "danger",
"unknown": "warning",
"online": "success",
"reboot": "info",
"created": "primary",
2015-11-19 22:37:06 +01:00
"netmon": "primary",
"update": "primary",
"orphaned": "default",
"admin": "warning",
return "label label-%s" % status_map.get(status, "default")
def anon_email(email, replacement_char='.'):
if 'user' in session:
return email
def anon_str(s, full=False):
if full:
return replacement_char * len(s)
hide_pos = int(len(s)/2)
return s[:hide_pos] + replacement_char + s[(hide_pos+1):]
prefix, tld = email.rsplit('.', 1)
user, domain = prefix.split('@')
return '%s@%s.%s' % (anon_str(user), anon_str(domain), anon_str(tld, True))
return email
def anon_email_regex(email):
return anon_email(email, '*').replace('.', '\.').replace('*', '.').replace('+', '\+').replace('_', '\_')
def gravatar_url(email):
return "" % hashlib.md5(email.encode("UTF-8").lower()).hexdigest()
def webui_addr(router_netifs):
for br_mesh in filter(lambda n: n["netif"] == "br-mesh", router_netifs):
for ipv6 in br_mesh["ipv6_addrs"]:
ipv6 = bintoipv6(ipv6)
if not ipv6:
return None
if ipv6.startswith("fd43"):
# This selects the first ULA address, if present
return ipv6
if ipv6.startswith("fdff") and len(ipv6) > 10:
# This selects the first fdff address, if present (and skips fdff::1)
return ipv6
return None
def format_airtime(airtime):
return "%.0f %%" % (airtime*100)