Logstash statt Ingest-Pipelines in Elasticsearch

Logstash statt Ingest-Pipelines in Elasticsearch? Das sind die Vorteile

Der ELK-Stack besteht aus Elasticsearch, Logstash und Kibana und bietet eine umfassende Lösung zur Bewältigung von Datenherausforderungen im Zusammenhang mit ETL (Extract, Transform, Load) sowie der Visualisierung der vorliegenden Daten. Dabei verfügt er über mehrere sofort einsatzbereite Funktionen zur Durchführung aller Arten von Datenmanipulation. Diese Datenmanipulation kann sowohl in Logstash als auch in Elasticsearch durchgeführt werden. Logstash selbst besteht aus drei Teilen: Input, Filter und Output. Der Filterabschnitt der Logstash-Konfigurationsdatei besitzt mehrere Funktionen zur Bearbeitung von Daten vor dem Versand an Elasticsearch. In Elasticsearch verfügen wir über Ingest-Pipelines, die es dem Benutzer ermöglichen, die Daten zu manipulieren, bevor sie in einem Index gespeichert werden.

Logstash in Elasticsearch zur Datenvorverarbeitung: Beispiele

Im Allgemeinen wird es vorgezogen, die oben genannte Datenverarbeitung in Elasticsearch durchführen zu lassen, da die Ingest-Pipelines intuitiver und leistungsfähiger sind. In einigen Fällen ist es jedoch praktischer, die Datenverarbeitung mithilfe von Logstash-Filter-Plugins zu implementieren. Ein Beispiel hierfür soll im Folgenden erläutert werden:

Ein Kunde nutzt Elasticsearch, um die Bestellungen zu verfolgen und die Daten in Kibana zu visualisieren. Jede Aktivität wird als Ereignis in einer Textdatei gespeichert. Das Ereignis enthält die Bestell-ID, den Attributtyp und einen Wert. Diese Datei wird mit Logstash eingelesen und an Elasticsearch gesendet. Der Client verwendet „Upsert“ im Output-Abschnitt von Logstash. Warum? Der Kunde will sicherstellen, dass es immer nur ein Dokument zu einer Bestellung gibt. Daher wird die Bestell-ID als eindeutiger Schlüssel festgelegt. Sobald ein neues Ereignis mit derselben Bestell-ID eingeht, für die in Elasticsearch bereits ein Dokument vorhanden ist, wird das Ereignis „aufgehoben“ und das Dokument in Elasticsearch mit neuen Attributen aktualisiert.

Da über Ingest-Pipelines in Elasticsearch keine Aktualisierungen oder Upserts möglich sind, sondern nur Neuindexierung von Dokumenten, mussten wir für diesen Anwendungsfall die Vorverarbeitung in Logstash durchführen. Hier sind zwei Beispiele dafür, wie wir die Daten basierend auf realen Szenarien vorverarbeiten:

1. Tokenizing

Der Kunde hatte eine Liste mit Zahlungsablehnungsgründen als durch Komma getrennte Zeichenfolge gespeichert, etwa so:

REJECTION_REASON: „denied_by_PP, high_credit_risk, new_user“

Der Kunde wollte die Daten basierend auf dem Ablehnungsgrund aggregieren und in Kibana visualisieren. Da das Feld jedoch als String gespeichert wurde, wurde es als einzelner Bucket-Wert aggregiert. Um ein Bucket je Ablehnungsgrund zu generieren, muss das Feld „REJECTION_REASON“ am Komma aufgeteilt und in eine Reihe von Werten überführt werden. Die von Logstash empfangenen Daten sahen so aus:

Attribute=REJECTION_REASON, processId=123432, orderId=322321, value=“denied_by_PP, high_credit_risk, new_user ”
Attribute=PRODUCT_ID, processId=123432, orderId=322321, value=”398AD”
Attribute=errHandleDuration_initial, processId=123432, orderId=322321, value=35426
Attribute=ORIGIN, processId=123432, orderId=322321, value=“DE ”

Das folgende Skript wurde im Abschnitt „output{}“ der Logstash-Konfiguration hinzugefügt, um die Zeichenfolge nur dann in ein Array von Strings umzuwandeln, wenn das Attribut „REJECTION_REASON“ ist:

if [attribute] == " REJECTION_REASON" {
	mutate {
		gsub => [
			"value", "\s+", ""
		]
	}
	mutate {
		split => { "value" => "," }
	}
	}

Das Script überprüft zunächst, ob die Attributwerte „REJECTION_REASON“ entsprechen, und entfernt dann die Leerzeichen aus der Zeichenfolge. Anschließend wird die Zeichenfolge am Komma in mehrere Werte aufgeteilt. Der resultierende Wert für „REJECTION REASON“ ist dann letztendlich ein Array:

[denied_by_PP, high_credit_risk, new_user

Bei der Aggregation in Elasticsearch und der Verwendung in Visualisierungen werden so für die verschiedenen Ablehnungsgründe denied_by_PP, high_credit_risk und new_user separate Buckets generiert.

2. Conversion

Ein weiteres Problem, mit dem der Kunde konfrontiert war, betraf die Werteinheiten für die Dauer im Zusammenhang mit Bestellungen. In einigen Feldern wurde die Zeiteinheit in Sekunden und in anderen in Millisekunden angegeben. Die Anforderung bestand darin, Werte von Millisekunden in Sekunden umzuwandeln und dort abzurunden, wo das Attribut „errHandleDuration_“ enthält. Der Abschnitt „output{}“ der Logstash-Konfiguration wurde auf diese Weise aktualisiert:

if [attribute] =~ "errHandleDuration_" {
	   ruby {
				code => 'event.set("value", (event.get("value")/1000).round())'
			}
	}

Ja! Logstash ermöglicht das Integrieren von Ruby-Code, was das Schreiben von Code flexibel und bequem macht. Mit seinen vielen verfügbaren Filtern und Optionen im Output-Teil ist Logstash ein leistungsstarkes Werkzeug für die Vorverarbeitung von Daten. Durch die Verwendung von Logstash wird Elasticsearch entlastet, sodass dort mehr Ressourcen für andere Aufgaben zur Verfügung stehen.

Sie haben Fragen zu Logstash? Sie benötigen Unterstützung von SHI aus Augsburg bei der Verarbeitung Ihrer Daten? Nehmen Sie gleich Kontakt auf!
Arsal Jalib

Arsal Jalib

... hat seinen Master in Informatik an der TU Berlin mit seiner Abschlussarbeit zum Thema „Deep Learning“ abgeschlossen. Er war mehrere Jahre als Softwarequalitätsbeauftragter und Softwareentwickler tätig. Derzeit interessiert er sich eher für Such- und Analysethemen und liebt es, neue Dinge zu lernen. Lieblingsdateiformat: .txt und .json