api/alfred: Check for new router data initially by system time

If we receive data from more than one gateway, there happens to
be a mix of older and newer data (since synchronization between
gateways seems to be not working).

To deal with that, we now only accept data where the router's
system time is newer than the value stored in the DB. To account
for time synchronization issues, we also accept data which is more
than one hour older.

This patch removes other checks for old data which are now obsolete.

Signed-off-by: Adrian Schmutzler <freifunk@adrianschmutzler.de>
This commit is contained in:
Adrian Schmutzler 2018-02-14 16:38:39 +01:00
parent d3ea76b648
commit 5d7e00422e
1 changed files with 88 additions and 79 deletions

View File

@ -75,9 +75,16 @@ def import_nodewatcher_xml(mysql, mac, xml, banned, netifdict):
if findrouter:
router_id = findrouter["router"]
olddata = mysql.findone("SELECT sys_uptime, firmware, hostname, hood, status, lat, lng, contact, description, position_comment, w2_active, w2_busy, w5_active, w5_busy FROM router WHERE id = %s LIMIT 1",(router_id,))
olddata = mysql.findone("SELECT sys_uptime, sys_time, firmware, hostname, hood, status, lat, lng, contact, description, position_comment, w2_active, w2_busy, w5_active, w5_busy FROM router WHERE id = %s LIMIT 1",(router_id,))
if olddata:
uptime = olddata["sys_uptime"]
# Filter old data (Alfred keeps data for 10 min.; old and new can mix if gateways do not sync)
# We only use data where system time is bigger than before (last entry) or more than 1 hour smaller (to catch cases without timeserver)
newtime = router_update["sys_time"].timestamp()
oldtime = olddata["sys_time"].timestamp()
if not (newtime > oldtime or newtime < (oldtime - 3600)):
return
# keep hood up to date
if not router_update["hood"]:
@ -157,7 +164,7 @@ def import_nodewatcher_xml(mysql, mac, xml, banned, netifdict):
status_text = %s, contact = %s, lng = %s, lat = %s, neighbors = %s, reset = %s
WHERE id = %s
""",(
ru["status"],ru["hostname"],ru["last_contact"],ru["sys_time"],ru["sys_uptime"],ru["memory"]["free"],ru["memory"]["buffering"],ru["memory"]["caching"],
ru["status"],ru["hostname"],ru["last_contact"],ru["sys_time"].strftime('%Y-%m-%d %H:%M:%S'),ru["sys_uptime"],ru["memory"]["free"],ru["memory"]["buffering"],ru["memory"]["caching"],
ru["sys_loadavg"],ru["processes"]["runnable"],ru["processes"]["total"],ru["clients"],ru["clients_eth"],ru["clients_w2"],ru["clients_w5"],
ru["w2_active"],ru["w2_busy"],ru["w5_active"],ru["w5_busy"],ru["w2_airtime"],ru["w5_airtime"],ru["has_wan_uplink"],ru["tc_enabled"],ru["tc_in"],ru["tc_out"],
ru["cpu"],ru["chipset"],ru["hardware"],ru["os"],
@ -215,7 +222,7 @@ def import_nodewatcher_xml(mysql, mac, xml, banned, netifdict):
status_text, contact, lng, lat, neighbors)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",(
ru["status"],ru["hostname"],created,ru["last_contact"],ru["sys_time"],ru["sys_uptime"],ru["memory"]["free"],ru["memory"]["buffering"],ru["memory"]["caching"],
ru["status"],ru["hostname"],created,ru["last_contact"],ru["sys_time"].strftime('%Y-%m-%d %H:%M:%S'),ru["sys_uptime"],ru["memory"]["free"],ru["memory"]["buffering"],ru["memory"]["caching"],
ru["sys_loadavg"],ru["processes"]["runnable"],ru["processes"]["total"],ru["clients"],ru["clients_eth"],ru["clients_w2"],ru["clients_w5"],
None,None,None,None,None,None,ru["has_wan_uplink"],ru["tc_enabled"],ru["tc_in"],ru["tc_out"],
ru["cpu"],ru["chipset"],ru["hardware"],ru["os"],
@ -320,7 +327,8 @@ def import_nodewatcher_xml(mysql, mac, xml, banned, netifdict):
if olddata:
# fire events
with suppress(KeyError, TypeError, UnboundLocalError):
if (olddata["sys_uptime"] - 300) > router_update["sys_uptime"]:
#if (olddata["sys_uptime"] - 300) > router_update["sys_uptime"]:
if olddata["sys_uptime"] > router_update["sys_uptime"]:
events_append(mysql,router_id,"reboot","")
with suppress(KeyError, TypeError, UnboundLocalError):
@ -522,87 +530,88 @@ def set_status(mysql,router_id,status):
router_id,))
def new_router_stats(mysql, router_id, uptime, router_update, netifdict):
if (uptime + CONFIG["router_stat_mindiff_secs"]) < router_update["sys_uptime"]:
time = mysql.utctimestamp()
#if not (uptime + CONFIG["router_stat_mindiff_secs"]) < router_update["sys_uptime"]:
# return
time = mysql.utctimestamp()
stattime = mysql.findone("SELECT time FROM router_stats WHERE router = %s ORDER BY time DESC LIMIT 1",(router_id,),"time")
if not stattime or (stattime + CONFIG["router_stat_mindiff_default"]) < time:
mysql.execute("""
INSERT INTO router_stats (time, router, sys_memfree, sys_membuff, sys_memcache, loadavg, sys_procrun, sys_proctot,
clients, clients_eth, clients_w2, clients_w5, airtime_w2, airtime_w5)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",(
time,
router_id,
router_update["memory"]['free'],
router_update["memory"]['buffering'],
router_update["memory"]['caching'],
router_update["sys_loadavg"],
router_update["processes"]['runnable'],
router_update["processes"]['total'],
router_update["clients"],
router_update["clients_eth"],
router_update["clients_w2"],
router_update["clients_w5"],
router_update["w2_airtime"],
router_update["w5_airtime"],
))
netiftime = mysql.findone("SELECT time FROM router_stats_netif WHERE router = %s ORDER BY time DESC LIMIT 1",(router_id,),"time")
if not netiftime or (netiftime + CONFIG["router_stat_mindiff_netif"]) < time:
ndata = []
nkeys = []
for netif in router_update["netifs"]:
# sanitize name
name = netif["name"].replace(".", "").replace("$", "")
with suppress(KeyError):
if name in netifdict.keys():
ndata.append((time,router_id,netifdict[name],netif["traffic"]["rx"],netif["traffic"]["tx"],))
else:
nkeys.append((name,))
stattime = mysql.findone("SELECT time FROM router_stats WHERE router = %s ORDER BY time DESC LIMIT 1",(router_id,),"time")
if not stattime or (stattime + CONFIG["router_stat_mindiff_default"]) < time:
mysql.execute("""
INSERT INTO router_stats (time, router, sys_memfree, sys_membuff, sys_memcache, loadavg, sys_procrun, sys_proctot,
clients, clients_eth, clients_w2, clients_w5, airtime_w2, airtime_w5)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",(
time,
router_id,
router_update["memory"]['free'],
router_update["memory"]['buffering'],
router_update["memory"]['caching'],
router_update["sys_loadavg"],
router_update["processes"]['runnable'],
router_update["processes"]['total'],
router_update["clients"],
router_update["clients_eth"],
router_update["clients_w2"],
router_update["clients_w5"],
router_update["w2_airtime"],
router_update["w5_airtime"],
))
netiftime = mysql.findone("SELECT time FROM router_stats_netif WHERE router = %s ORDER BY time DESC LIMIT 1",(router_id,),"time")
if not netiftime or (netiftime + CONFIG["router_stat_mindiff_netif"]) < time:
# 99.9 % of the routers will NOT enter this, so the doubled code is not a problem
if nkeys:
mysql.executemany("""
INSERT INTO netifs (name)
VALUES (%s)
ON DUPLICATE KEY UPDATE name=name
""",nkeys)
netifdict = mysql.fetchdict("SELECT id, name FROM netifs",(),"name","id")
ndata = []
nkeys = []
for netif in router_update["netifs"]:
# sanitize name
name = netif["name"].replace(".", "").replace("$", "")
with suppress(KeyError):
if name in netifdict.keys():
ndata.append((time,router_id,netifdict[name],netif["traffic"]["rx"],netif["traffic"]["tx"],))
else:
nkeys.append((name,))
# 99.9 % of the routers will NOT enter this, so the doubled code is not a problem
if nkeys:
mysql.executemany("""
INSERT INTO netifs (name)
VALUES (%s)
ON DUPLICATE KEY UPDATE name=name
""",nkeys)
netifdict = mysql.fetchdict("SELECT id, name FROM netifs",(),"name","id")
ndata = []
for netif in router_update["netifs"]:
# sanitize name
name = netif["name"].replace(".", "").replace("$", "")
with suppress(KeyError):
ndata.append((time,router_id,netifdict[name],netif["traffic"]["rx"],netif["traffic"]["tx"],))
mysql.executemany("""
INSERT INTO router_stats_netif (time, router, netif, rx, tx)
VALUES (%s, %s, %s, %s, %s)
""",ndata)
ndata.append((time,router_id,netifdict[name],netif["traffic"]["rx"],netif["traffic"]["tx"],))
# reuse timestamp from router_stats to avoid additional queries
if not stattime or (stattime + CONFIG["router_stat_mindiff_default"]) < time:
nbdata = []
for neighbour in router_update["neighbours"]:
with suppress(KeyError):
nbdata.append((time,router_id,neighbour["mac"],neighbour["quality"],))
mysql.executemany("""
INSERT INTO router_stats_neighbor (time, router, mac, quality)
VALUES (%s, %s, %s, %s)
""",nbdata)
# reuse timestamp from router_stats to avoid additional queries
if not stattime or (stattime + CONFIG["router_stat_mindiff_default"]) < time:
gwdata = []
for gw in router_update["gws"]:
with suppress(KeyError):
gwdata.append((time,router_id,gw["mac"],gw["quality"],))
mysql.executemany("""
INSERT INTO router_stats_gw (time, router, mac, quality)
VALUES (%s, %s, %s, %s)
""",gwdata)
mysql.executemany("""
INSERT INTO router_stats_netif (time, router, netif, rx, tx)
VALUES (%s, %s, %s, %s, %s)
""",ndata)
# reuse timestamp from router_stats to avoid additional queries
if not stattime or (stattime + CONFIG["router_stat_mindiff_default"]) < time:
nbdata = []
for neighbour in router_update["neighbours"]:
with suppress(KeyError):
nbdata.append((time,router_id,neighbour["mac"],neighbour["quality"],))
mysql.executemany("""
INSERT INTO router_stats_neighbor (time, router, mac, quality)
VALUES (%s, %s, %s, %s)
""",nbdata)
# reuse timestamp from router_stats to avoid additional queries
if not stattime or (stattime + CONFIG["router_stat_mindiff_default"]) < time:
gwdata = []
for gw in router_update["gws"]:
with suppress(KeyError):
gwdata.append((time,router_id,gw["mac"],gw["quality"],))
mysql.executemany("""
INSERT INTO router_stats_gw (time, router, mac, quality)
VALUES (%s, %s, %s, %s)
""",gwdata)
def calculate_network_io(mysql, router_id, uptime, router_update):
"""
@ -679,7 +688,7 @@ def parse_nodewatcher_xml(xml):
"status_text": evalxpath(tree,"/data/system_data/status_text/text()"),
"contact": evalxpath(tree,"/data/system_data/contact/text()"),
# system
"sys_time": datetime.datetime.fromtimestamp(evalxpathint(tree,"/data/system_data/local_time/text()")).strftime('%Y-%m-%d %H:%M:%S'),
"sys_time": datetime.datetime.fromtimestamp(evalxpathint(tree,"/data/system_data/local_time/text()")),
"sys_uptime": int(evalxpathfloat(tree,"/data/system_data/uptime/text()")),
"sys_loadavg": evalxpathfloat(tree,"/data/system_data/loadavg/text()"),
"memory": {