如何用 SQL 统计日志中五分钟内请求数超过 100 的 IP
某客户网站出现爬虫,需要我从一个月的日志中分析出五分钟内请求量超 100 的 IP。提供的是一个 CSV 格式文件,每个请求占一行:
2021-11-09 13:04:06,223.80.x.x 2021-11-09 13:04:08,223.211.x.x ... 2021-12-09 13:00:01,122.x.x.x
统计类的需求优先考虑用 SQL 来完成,为了方便我直接用 SQLite,创建数据库和表:
$ sqlite3 log.db sqlite> CREATE TABLE log(time text, ip text);
把 log.csv 的数据入库:
sqlite> .mode csv sqlite> .import log.csv log
接着步骤如下:
1、算出每条的请求时间计算出5分钟范围,例如 2021-11-09 13:02:44,5 分钟范围则是 2021-11-09 13:00:00 ~ 2021-11-09 13:05:00;
2、汇总统计时间范围。
完成第一步的重点在于如何计算时间范围,为了方便计算,先将日期字符串转为时间戳:
select time, strftime('%s', time, 'utc') from log
日期范围用取模运算计算,计算公式如下,时间戳以秒为单位,5 分钟即 300 秒:
起点时间:时间戳 - 时间戳 mod 300 终点时间:时间戳 + 时间戳 mod 300
这里我在 shell 中演示:
$ date +%s # 当前时间戳 1639378362 $ date -d @1639378362 # 转换为易读格式 2021年 12月 13日 星期一 14:52:42 CST $ echo '1639378362 - 1639378362 % 300' | bc # 计算起点时间 1639378200 $ date -d @1639378200 # 格式转换 2021年 12月 13日 星期一 14:50:00 CST $ echo '1639378362 + 1639378362 % 300' | bc # 计算终点时间 1639378524 $ date -d @1639378524 # 格式转换 2021年 12月 13日 星期一 14:55:24 CST
把这个思路转换为 SQL,“||”是字符串连接符,为了易读:
select (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300) || '~' || (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300 + 300) as range, ip from log;
下一步做一个 group by,将相同时间范围类的统计出来:
select (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300) || '~' || (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300 + 300) as range, ip, count(1) from log group by range, ip;
为了易读,做一次嵌套 SQL,把日期格式化下:
select datetime(substr(range, 0, 11), 'unixepoch', 'localtime') || '~' || datetime(substr(range, 12), 'unixepoch', 'localtime'), ip, total from(select (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300) || '~' || (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300 + 300) as range, ip, count(1) as total from log group by range,ip) where total >= 100;
查询结果:
"2021-11-30 14:45:00~2021-11-30 14:50:00",183.x.x.161,100 "2021-11-30 14:50:00~2021-11-30 14:55:00",183.x.x.161,175 "2021-11-30 14:50:00~2021-11-30 14:55:00",183.x.x.166,132 "2021-11-30 14:50:00~2021-11-30 14:55:00",183.x.x.171,112 "2021-11-30 14:50:00~2021-11-30 14:55:00",183.x.x.174,114 "2021-11-30 14:55:00~2021-11-30 15:00:00",183.x.x.161,177 ...
验证无误后,将结果以 CSV 格式输出到文件中:
sqlite> .output result.csv sqlite> select ...> datetime(substr(range, 0, 11), 'unixepoch', 'localtime') || '~' || datetime(substr(range, 12), 'unixepoch', 'localtime'), ip, total ...> from(select ...> (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300) || '~' || (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300 + 300) as range, ip, count(1) as total ...> from log ...> group by range,ip) ...> where total >= 100; sqlite>