Визуализация атак на базе ELK (elasticsearch, kibana, logstash)

В процессе обслуживания большого количества серверов возникает необходимость централизации управления всем зоопарком машин, а так же централизованный сбор логов и аналитика логов на предмет выявления аномалий, ошибок и общей статистики.

В качестве централизованного сбора логов используется rsyslog, а для структурирования и визуализации elasticsearch + kibana. Все бы ничего, но когда количество подключенных машин разрастается, то данных настолько много, что уходит (уходило) большое количество времени на их обработку и анализ. Наряду с другими интересными штуками всегда хотелось организовать свой центр безопасности. Этакая мультимониторная статистика с картами, графиками и прочим.

В данной статье хочу описать свой опыт создания монитора статистики по атакам на ссш. Не будем рассматривать в этом плане защиту, потому как программисты и прочие могут не уметь подключаться на нестандартные порты или пользоваться сертификатами.

Так как у нас уже есть развернутый elastic с kibana, то будем на его базе и городить нашу систему.

Итак, у нас уже есть установленный докер и docker-compose, значит будем поднимать сервисы на нем.

elastic:

elasticsearch:
  build: elasticsearch:2.3.4
  container_name: elastic
  <span class="hljs-built_in">command</span>: elasticsearch -Des.network.host=0.0.0.0
  net: host
  ports:
    - <span class="hljs-string">"9200:9200"</span>
    - <span class="hljs-string">"9300:9300"</span>
  volumes:
    - <span class="hljs-string">"/srv/docker/elastic/etc:/usr/share/elasticsearch/config"</span>
    - <span class="hljs-string">"/srv/docker/elastic/db:/usr/share/elasticsearch/data"</span>
    - <span class="hljs-string">"/etc/localtime:/etc/localtime:ro"</span>
  restart: always
  environment:
    - ES_HEAP_SIZE=2g

/srv/docker/elastic/elasticsearch.yml:

cluster.name: Prod
node.name: <span class="hljs-string">"central-syslog"</span>
http.port: 9200
network.host: _non_loopback_
discovery.zen.ping.multicast.enabled: <span class="hljs-literal">false</span>
discovery.zen.ping.unicast.hosts: [
 ]
transport.publish_host: 0.0.0.0
<span class="hljs-comment">#transport.publish_port: 9300</span>
http.cors.enabled : <span class="hljs-literal">true</span>
http.cors.allow-origin : <span class="hljs-string">"*"</span>
http.cors.allow-methods : OPTIONS, HEAD, GET, POST, PUT, DELETE
http.cors.allow-headers : X-Requested-With,X-Auth-Token,Content-Type, Content-Length
script.engine.groovy.inline.aggs: on

/srv/docker/elastic/logging.yml:

logger:
  action: DEBUG
  com.amazonaws: WARN
appender:
  console:
    <span class="hljs-built_in">type</span>: console
    layout:
      <span class="hljs-built_in">type</span>: consolePattern
      conversionPattern: <span class="hljs-string">"[%d{ISO8601}][%-5p][%-25c] %m%n"</span>

kibana:

kibana:
  image: kibana
  restart: always
  container_name: kibana
  environment:
    SERVICE_NAME: <span class="hljs-string">'kibana'</span>
    ELASTICSEARCH_URL:  <span class="hljs-string">"http://x.x.x.x:9200"</span>
  ports:
    - <span class="hljs-string">"4009:5601"</span>
  volumes:
    - <span class="hljs-string">"/etc/localtime:/etc/localtime:ro"</span>

logstash:

logstash:
 image: logstash:latest
 restart: always
 container_name: logstash
 hostname: logstash
 ports:
  - <span class="hljs-string">"1025:1025"</span>
  - <span class="hljs-string">"1026:1026"</span>
 volumes:
  - <span class="hljs-string">"/srv/docker/logstash/logstash.conf:/etc/logstash.conf:ro"</span>
  - <span class="hljs-string">"/srv/docker/logstash/ssh-map.json:/etc/ssh-map.json:ro"</span>
 <span class="hljs-built_in">command</span>: <span class="hljs-string">"logstash -f /etc/logstash.conf"</span>

Итак. Мы запустили эластик и кибану, но осталось подготовить логстеш к обработке логов от внешних серверов. Я реализовал вот такую схему:

rsyslog → logstash → elastic → kibana.

Можно было бы использовать встроенный в rsyslog коннектор напрямую в эластик, но нам нужны данные по полям и с geoip для статистики.

На серверах, подключаемых к мониторингу вносим в конфиг rsyslog (обычно это /etc/rsyslog.d/50-default.conf) вот такую запись:

auth,authpriv.*                 @@x.x.x.x:1026

Этой записью мы отправляем все события об авторизации на наш удаленный сервер (logstash).

Далее, полученные логстешем логи нам нужно обработать и оформить. Для этого создаем маппинг полей, чтобы на выходе нам было удобно работать (/srv/docker/logstash/ssh-map.json):

{
        <span class="hljs-string">"template"</span>: <span class="hljs-string">"logstash-*"</span>,
        <span class="hljs-string">"mappings"</span>: {
                <span class="hljs-string">"ssh"</span>: {
                        <span class="hljs-string">"properties"</span>: {
                                <span class="hljs-string">"@timestamp"</span>: {
                                        <span class="hljs-string">"type"</span>: <span class="hljs-string">"date"</span>,
                                        <span class="hljs-string">"format"</span>: <span class="hljs-string">"strict_date_optional_time||epoch_millis"</span>
                                },
                                <span class="hljs-string">"@version"</span>: {
                                        <span class="hljs-string">"type"</span>: <span class="hljs-string">"string"</span>
                                },
                                <span class="hljs-string">"username"</span>: {
                                        <span class="hljs-string">"type"</span>: <span class="hljs-string">"string"</span>
                                },
                                <span class="hljs-string">"src_ip"</span>: {
                                        <span class="hljs-string">"type"</span>: <span class="hljs-string">"string"</span>
                                },
                                <span class="hljs-string">"port"</span>: {
                                        <span class="hljs-string">"type"</span>: <span class="hljs-string">"long"</span>
                                },
                        }
                }
        }
}

В ходе создания маппинга столкнулся с одним багом логстеша, а именно присвоению полю значения geo_point (при создании своего индекса выставляется значение у geoip.location — float), по которому в дальнейшем будет строиться heatmap на карте. Баг этот зарегистрирован и в качестве workaround мне пришлось использовать шаблон стандартных индексов logstash-*.

Итак, маппинг у нас есть. Теперь нужно подготовить конфиг logstash, чтобы он фильтровал входящие данные и в нужном формате отдавал в эластик (/srv/docker/logstash/logstash.conf):

input {
   tcp {
        port => 1026
        <span class="hljs-built_in">type</span> => <span class="hljs-string">"security"</span>
   }
}

filter {
  grok {
    match => [<span class="hljs-string">"message"</span>, <span class="hljs-string">"Failed password for (invalid user |)%{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:port} ssh2"</span>]
    add_tag => <span class="hljs-string">"ssh_brute_force_attack"</span>
  }
  grok {
    match => [<span class="hljs-string">"message"</span>, <span class="hljs-string">"Accepted password for %{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:port} ssh2"</span>]
    add_tag => <span class="hljs-string">"ssh_sucessful_login"</span>
  }
  geoip {
    <span class="hljs-built_in">source</span> => <span class="hljs-string">"src_ip"</span>
  }
}
output {
   <span class="hljs-keyword">if</span> <span class="hljs-string">"ssh_brute_force_attack"</span> <span class="hljs-keyword">in</span> [tags] {
         elasticsearch {
                hosts => [<span class="hljs-string">"x.x.x.x:9200"</span>]
                index => <span class="hljs-string">"logstash-%{+YYYY.MM.dd}"</span>
                manage_template => <span class="hljs-literal">true</span>
                template_name => <span class="hljs-string">"ssh"</span>
                template => <span class="hljs-string">"/etc/ssh-map.json"</span>
                template_overwrite => <span class="hljs-literal">true</span>
         } 
   }
}

Конфиг удобочитаемый, понятный, поэтому комментировать считаю излишним.

Итак. Логи ссш попадают в логстеш, он их обрабатывает и отправляет в индексы эластика. Осталось только настроить визуализацию:

— Открываем веб интерфейс по адресу x.x.x.x:4009/
— Переходим в Settings и добавляем работу с нашими индексами (logstash-*)

Далее нам нужно в kibana создать поисковые запросы, визуализацию и дашборд.

Во вкладке Discover после добавления индексов в kibana мы видим наши записи — все настроили верно.

В левой колонке мы видим список полей для фильтрации, с ними и будем работать.

Первым фильтром пойдет список атакуемых серверов:

— около поля host нажимаем add
— сохраняем поиск как ssh-brute-servers-under-attack (имя вариативно)

Вторым фильтром будет список атакующих стран:

— около поля geoip.country_name нажимаем add
— сохраняем как ssh-brute-countries (имя вариативно)

Третьим фильтром будет общее количество атак:

— переходим на вкладку Discovery
— сохраняем как ssh-brute-all

Итак, на финальном экране у нас будет отображаться четыре разных параметра:

1. Суммарное количество атак
2. Атакующие страны
3. Атакуемые серверы
4. Карта с указателями на атакующие хосты

Суммарное количество атак:

— переходим на вкладку Visualize
— выбираем тип визуализации Metric
— From saved search — ssh-brute-all
— Открываем Metric и меняем значение поля на — Суммарное количество атак
— Сохраняем визуализацию

Атакующие страны:

— переходим на вкладку Visualize
— выбираем тип визуализации Data table
— From saved search — ssh-brute-countries
— Открываем Metric и меняем значение поля на — Количество атак
— Теперь нам нужно соотнести поля и посчитать в таблице «уники». Нажимаем split rows
— Aggregation — terms
— Field — geoip.country_name.raw
— Custom label — Страна

Если все ввели верно, то загорится зеленая кнопка play, после нажатия на которую увидим примерно такую картину:

image

— Сохраняем визуализацию

Атакуемые серверы:
— переходим на вкладку Visualize
— выбираем тип визуализации Data table
— From saved search — ssh-brute-servers-under-attack
— Открываем Metric и меняем значение поля на — Количество атак
— Теперь нам нужно соотнести поля и посчитать в таблице «уники». Нажимаем split rows
— Aggregation — terms
— Field — host.raw
— Custom label — Сервер

Если все ввели верно, то загорится зеленая кнопка play, после нажатия на которую увидим примерно такую картину:

image

— Сохраняем визуализацию

Карта с указателями на атакующие хосты (самое интересное)

— переходим на вкладку Visualize
— выбираем тип визуализации Tile map
— From new search — Select an index pattern — logstash-*
— Открываем Geo Coordinates. Если все шаги были верными, автоматически поле Field заполнится geoip.location
— Переходим в Options
— Меняем хостинг карт (так как у MapRequest изменились условия и нужно получать токен и дополнительно что-то делать). Ставим галку в — WMS compliant map server
— Приводим все поля к параметрам:

WMS Url — basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer
WMS layers* — 0
WMS version* — 1.3.0
WMS format* — image/png
WMS attribution — Maps provided by USGS
WMS styles* — пусто

В итоге у нас должна отобразиться карта атак:

image

— Сохраняем визуализацию

Теперь у нас всё есть для того, чтобы сделать свой дашборд.

Переходим на вкладку Dashboard, нажимаем на Add visualization (плюс в круге справа сверху) и добавляем свои сохраненные визуализации на экран и сохраняем экран. Поддерживается Drag’n’Drop. В итоге у меня получился вот такой экран:

image

Теперь для подключения новых хостов к системе достаточно будет указывать у них передачу логов авторизации на сервер logstash и при возникновении событий хосты и информация будут добавляться на экран и в эластик.

При желании вы всегда сможете добавить логи брутфорса контрольных панелей, собрать более детализированную статистику и добавить на экран по аналогии с этой статьей.

Поділіться своєю любов'ю

Залишити відповідь

0 комментариев
Новіші
Старіші Найпопулярніші
Вбудовані Відгуки
Переглянути всі коментарі
0
Ми любимо ваші думки, будь ласка, прокоментуйте.x