Elasticsearch junto con Kibana nos permite visualizar de forma muy rápida y sencilla datos recolectados sin necesidad de programar ninguna interfaz web ni similar, por ese motivo vamos a integrar los logs de libmodsecurity con Elasticsearch. Podremos ver la lista de alertas, filtrar por diferentes campos y visualizar los ataques geolocalizándolos en un mapa.
La infraestructura ELK(Elasticsearch Logstash Kibana) se divide en cuatro capas:
- Filebeat: Colector de logs, este puede ser filebeat o cualquier otro, encargado de leer los ficheros de logs y enviarlos a Logstash.
- Logstash: Recibe los logs del colector, los procesa y los inserta en Elasticsearch.
- Elasticsearch: Base de datos donde se almacenan los registros procesados por Logstash.
- Kibana: Interfaz web para la visualización de los datos almacenados en Elasticsearch.
NOTA: Estrictamente hablando el servidor de Logstash puede ser prescindible ya que Filebeat es capaz de insertar registros directamente en Elastichsearch pero mediante Logstash podremos hacer manipulaciones de datos mas complejas que solo utilizando Filebeat.
Nuestro escenario se compondrá de un balanceador de carga, un servidor web que a su vez servirá la interfaz de Kibana y correrá Filebeat, un Logstash para procesar los datos recibidios y un Elastichsearch.
Empezamos compilando el haproxy:
Realizamos una configuración básica donde se enviará todo el tráfico al servidor web:
defaults
timeout connect 5000
timeout client 50000
timeout server 50000
listen statistics
bind *:9999
mode http
stats enable
stats hide-version
stats realm "HaStats"
stats uri /stats
stats auth admin:XXXXXXXXX
listen http-in
bind *:80
default_backend webnodes
backend webnodes
mode http
option forwardfor
server webserver KIBANASERVER:80
Compilamos Elasticsearch:
NOTA: Puede aparecer el error:
[1]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
En tal caso ajustamos el parámetro del kernel vm.max_map_count:
vi /etc/sysctl.conf
vm.max_map_count = 262144
Configuramos Elasticsearch para que se bindee a todas las interfaces de red y opere en modo mononodo:
network.host: 0.0.0.0
discovery.seed_hosts: []
Arrancamos el servicio:
Lo añadimos al runlevel por defecto:
Filtramos el tráfico de red para que solo se tenga acceso al Elasticsearch desde Logstash y Kibana:
iptables -I INPUT 1 -p tcp –dport 9200 -s LOGSTASHSERVER -j ACCEPT
iptables -I INPUT 1 -p tcp –dport 9200 -s KIBANASERVER -j ACCEPT
Guardamos las reglas para que en caso de reinicio se carguen de forma automática:
rc-update add sshd default
Ahora en el servidor web compilamos Nginx, libmodsecurity y realizamos la configuración indicada en este artÃculo anterior.
En el servidor web tendremos también Kibana corriendo asà que lo compilamos junto con una herramienta muy útil para parsear JSONs:
Indicamos a Kibana a que servidor Elasticsearch debe conectar, según el sistema de arranque que utilicemos lo haremos de un modo u otro, en mi caso es OpenRC:
To set a customized Elasticsearch instance:
 *  OpenRC: set ES_INSTANCE in /etc/conf.d/kibana
 *  systemd: set elasticsearch.url in /etc/kibana/kibana.yml
ES_INSTANCE=http://ELASTICSEARCHSERVER:9200
Bindeamos Kibana a todas la interfaces:
server.host: "0.0.0.0"
Arrancamos Kibana:
Metemos el servicio en el runlevel default:
Comprobamos que Kibana arranca y puede conectar al Elasticsearch:
http://KIBANASERVER:5601
Volvemos a bindearlo a localhost comentando la lÃÂnea:
#server.host: "0.0.0.0"
Reiniciamos el servicio para que aplique la nueva configuración:
Kibana no dispone de autenticación de forma nativa asà que utilizaremos Nginx para ello, para generar el fichero de hashes es necesario instalar las apache-tools:
Generamos el fichero de hashes para el usuario kibanauser:
La configuración de Nginx quedarÃa del siguiente modo:
user nginx nginx;
worker_processes 1;
error_log /var/log/nginx/error_log info;
events {
worker_connections 1024;
use epoll;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main
'$remote_addr - $remote_user [$time_local] '
'"$request" $status $bytes_sent '
'"$http_referer" "$http_user_agent" '
'"$gzip_ratio"';
client_header_timeout 10m;
client_body_timeout 10m;
send_timeout 10m;
connection_pool_size 256;
client_header_buffer_size 1k;
large_client_header_buffers 4 2k;
request_pool_size 4k;
gzip off;
output_buffers 1 32k;
postpone_output 1460;
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 75 20;
ignore_invalid_headers on;
index index.php;
server {
listen 0.0.0.0;
server_name modsecurityTest.alfaexploit.com;
access_log /var/log/nginx/modsecurityTest.access_log main;
error_log /var/log/nginx/modsecurityTest.error_log info;
root /var/www/modsecurityTest/;
location ~* \.php$ {
fastcgi_index index.php;
fastcgi_pass 127.0.0.1:9000;
include fastcgi_params;
fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
fastcgi_param SCRIPT_NAME $fastcgi_script_name;
}
}
server {
listen 8080 default_server;
listen [::]:8080 default_server;
auth_basic "Restricted Access";
auth_basic_user_file /etc/nginx/htpasswd.users;
index index.html index.htm index.nginx-debian.html;
server_name _;
location / {
proxy_pass http://localhost:5601;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_cache_bypass $http_upgrade;
}
}
}
Reiniciamos el servicio:
Comprobamos la autenticación funciona:
http://KIBANASERVER:8080
kibanauser
PASSWORD
Ahora dejamos de lado el servidor web para centrarnos en el de Logstash, compilamos el software:
En la configuración de Logstash spliteamos los mensajes para hacer que cada [transaction][messages][message] sea un evento independiente, si el mensaje es nulo eliminamos el evento, en caso contrario alertmessage: [transaction][messages][message]
input {
beats {
port => 5044
host => "0.0.0.0"
}
}
filter {
# Split by transaction-messages: The split filter clones an event by splitting one of its fields and placing each value resulting from the split into a clone of the original event
split {
field => "[transaction][messages]"
}
# Assign attackerip field depending of direct traffic or load balanced
if ([transaction][request][headers][X-Forwarded-For]) {
mutate {
add_field => {
"attackerip" => "%{[transaction][request][headers][X-Forwarded-For]}"
}
}
} else {
mutate {
add_field => {
"attackerip" => "%{[transaction][client_ip]}"
}
}
}
# Delete not matched rules messages
if [transaction][messages][message] == "" {
drop {}
} else {
# Delete anomaly score checks and internal messages
if [transaction][messages][details][data] == "" {
drop {}
} else {
mutate {
add_field => {
"alertmessage" => "%{[transaction][messages][message]}"
}
add_field => {
"alertdata" => "%{[transaction][messages][details][data]}"
}
remove_field => [ "[transaction][messages]" ]
}
}
}
# Geoip: Geoip filter is not supposed to try to populate if the source is a private IP address. It only works on public address.
geoip { source => "attackerip" }
}
output {
elasticsearch {
hosts => "ELASTICSEARCHSERVER:9200"
}
}
Arrancamos el servicio:
Lo añadimos al runlevel por defecto:
Filtramos el tráfico para que solo el servidor web pueda enviarnos logs a procesar:
iptables -I INPUT 1 -p tcp –dport 5044 -s KIBANASERVER -j ACCEPT
Guardamos las reglas para que en caso de reinicio se carguen de forma automática:
Una forma de debugear problemas en Logstash es enviar la salida a un fichero de texto en formato JSON de este modo podremos ver si los filtros se aplican correctamente antes de enviar los datos al Elasticsearch:
output {
file {
codec => "json"
path => "/tmp/grok.log"
}
}
Podemos visualizar la salida haciéndola pasar por jq:
Otra opción es arrancarlo de forma manual en foreground de este modo podremos ver posibles errores:
Tan solo queda instalar Filebeat en el servidor web para que parsee los logs y los envÃe al Logstash, para ello nos aseguramos de que libmodsecurity guarde los ficheros de logs de modo concurrente en el directorio indicado:
SecAuditLogType Concurrent
SecAuditLogStorageDir /opt/modsecurity/var/audit
Aplicamos la configuración:
Instalamos Filebeat:
Configuramos Filebeat para que lea los ficheros generados por libmodsecurity y los envÃe al Logstash:
filebeat.inputs:
- type: log
enabled: true
paths:
- /opt/modsecurity/var/audit/*/*/*
json.keys_under_root: true
encoding: utf-8
document_type: mod_security
close_eof: true
scan_frequency: 5s
clean_*: true
output.logstash:
hosts: ["LOGSTASHSERVER:5044"]
Arrancamos el servicio:
Lo añadimos al runlevel por defecto:
Comprobamos que esté parseando los ficheros indicados:
2019-11-22T14:42:47.905+0100 INFO log/harvester.go:253 Harvester started for file: /opt/modsecurity/var/audit/20191122/20191122-1339/20191122-133905-157442634521.237997
Si todo funciona correctamente ya debemos de poder visualizar los datos obtenidos en Kibana, pero primero debemos generar un Ãndice en Elasticsearch, para ello accedemos a Kibana -> Discover y aplicamos el index pattern logstash-*
En las siguientes capturas podemos ver unas muestras de logs y del mapa de geolocalización:
Filebeat se puede debugear arrancándolo de forma manual e indicándole que muestre todos los mensajes que vaya a publicar:
filebeat -e -d "*" -c /etc/filebeat/filebeat.yml