Logstash Filebeat ETL February 23rd, 2018 Kosho Owa, Principal Solution Architect Jun Ohtani, Developer Evangelist
The Elastic Stack RESTFul Elasticsearch Elastic Stack
Filebeat
Beats
The Beats family Packetbeat Network data Metricbeat Metrics Winlogbeat Windows Event Logs +40 community Beats Auditbeat Filebeat Heartbeat Audit data Log files Uptime monitoring
Filebeat Go
Console File
Filebeat
https://www.elastic.co/downloads/beats/filebeat
filebeat - filebeat.yml / filebeat.reference.yml kibana Kibana module / modules.d Filebeat Module fields.yml
Prospector Harvester Processor Output
- help -> Console -> Elasticsearch
Filebeat E"T"L? Filebeat -> Ingest Node -> Index
- Extract drop_events drop_fields, keep_fields add_?_metadata cloud, docker, kubernetes
Filebeat -> Ingest Pipeline -> Index Beats?pipeline=ingest_pipeline_id Elasticsearch { field1 : value1, field2 : value2, field3 : value3 } Ingest Nodes { a : value4, b : value5, c : value6, d : value7 } Master / Data Nodes
Ingest Pipeline { field1 : value1, field2 : value2, field3 : value3 } ingest_node1 grok date set { a : value4, b : value5, c : value6, d : value7 } pipeline = set of processors
Ingest Pipeline Filebeat -> Ingest Node -> Index
Filebeat Module Filebeat -> Ingest Node(Es) -> Kibana
Module Apache2 Auditd Icinga Kafka MySQL Nginx Osquery PostgreSQL Redis System Traefik
Filebeat Module Module = Beats Elasticsearch Kibana Pipeline settings Index template Dashboard, Visualization, Index Pattern
Apache2 Module
Tips
Tips JSON Elastic Cloud with Cloud ID Elastic Cloud filebeat test config filebeat test output filebeat export
https://github.com/johtani/elastic_stack_examples/etl_webinar_20180223 25
Logstash
Logstash CloudWatch, file, ganglia, golf, github, PubSub, graphite, HTTP, IMAP, IRC, JDBC, Jms, JMX, Kafka, Kinesis, Log4j, pipe, Puppet, RabbitMQ, Redis, RSS, S3, Salesforce, SNMP trap, SQLite, SQS, stdin, STOMP, syslog, TCP, Twitter, UDP, UNIX, vans, web socket, WMI, XMPP/ Jabber Elasticsearch
Inputs Filters Outputs Beats TCP UDP HTTP JDBC HTTP Poller Structure Transform Normalize GeoIP Enrichment External Lookup CIDR & DNS lookups Elasticsearch TCP UDP HTTP File S3
Pipeline Worker(s) Input Queue Filter Output
# logstash.conf input { file { path => "/tmp/log.txt" } } filter {} output { stdout { codec=>rubydebug } } $ bin/logstash -f logstash.conf
2011-04-19T03:44:01.103Z 192.168.1.1 client logged on filter { grok { match => { "message" => "%{NOTSPACE:timestamp} %{NOTSPACE:client_ip} %{GREEDYDATA:action}" } } }
filter { grok { match => { "message" => \ "%{TIMESTAMP_ISO8601:timestamp} %{IP:client_ip} %{GREEDYDATA:action}" } } date { match => [ "timestamp", "ISO8601" ] } }
filter { translate { field => "client_ip" destination => "user" dictionary => [ "192.168.1.1", "User A", "192.168.1.2", "User B" ] fallback => "Unknown user" } }
input { stdin { codec => json } } filter {} input { stdin {} } filter { json { source => "message" } }
Tips GROK CSV CSV filter GROK - Ruby on Rails, Java stack trace Multiline codec Netflow Codec Netflow "module"
$ cat conf/input.conf input { stdin {} } $ cat conf/filter.conf filter {} $ cat conf/output.conf output { stdout { codec=>rubydebug } } $ bin/logstash -f conf
# pipelines.yml - pipeline.id: my-pipeline_1 path.config: "/etc/path/to/p1.config" pipeline.workers: 3 - pipeline.id: my-other-pipeline path.config: "/etc/different/path/p2.cfg" queue.type: persisted
fiter { if "_grokparsefailure" in [tags] { drop {} } }
Persistent Queues queue.type: persisted queue.max_bytes: 4gb Dead Letter Queues dead_letter_queue input dead_letter_queue.enable: true
Filebeat or Logstash? Ingest Node Elastic Stack
DB DWH