Neuigkeiten von trion.
Immer gut informiert.

Günstiger (Daten-) Strom

Bei unseren Schulungen ist es immer wieder eine interessante Herausforderung, prägnante Übungsaufgaben zu erstellen, die von den Teilnehmer erfolgreich in einer passenden Zeitspanne umgesetzt werden können.

Für Aufgaben im Kontext Stream- / Messaging verwenden wir dazu (unter anderen) die Twitter API (Version 2) ist. Diese beleuchten wir im Folgenden zusammen mit typischen Werkzeugen anhand einiger Beispiele.

Filtered Streams

Streams API Aufrufe zeichnen sich dadurch aus, permanent Nachrichten zu liefern. Der Endpoint in der Twitter-API ist grundsätzlich sehr einfach aufbaut:

GET /2/tweets/search/stream

Der direkte Aufruf in der Shell mittels cURL bzw. des modereren und bequemer zu nutzenden httpie wird allerdings zu einem 401 (Unauthorized) führen.

$ curl https://api.twitter.com/2/tweets/search/stream
$ http https://api.twitter.com/2/tweets/search/stream

Dafür gibt es (mindestens) drei Gründe:

  1. Der Strom ist günstig; nicht kostenlos. Sie erkaufen dies durch ihre Daten.

  2. Der Stream liefert die Ergebnisse einer Suchanfrage zurück. Diese ist nicht Teil der Endpoint-Spezifikation, sondern wird ein Form eines Nutzer-bezogenen Regelwerkes umgesetzt. Dazu bedarf es Authentifizierung und Autorisierung basierend auf OAuth.

  3. Twitter möchte (und muss) natürlich den Umfang der Zugriffe limitieren. Dazu gibt es ein dreistufiges Zugriffsmodell.

Für den Desktop bringt im Vergleich zu cURL und httpie natürlich Postman einen deutlichen Mehrwert.

Beispielsweise kann man einzelne cURL-Anfragen oder gleich eine Kollektion für die gesamte Twitter-API (via JSON) importieren.

Zugriff

Für den Zugriff ist ein Twitter Entwickerkonto einzurichten. Im anschliessend verfügbaren Developer-Dashboard kann man eine App eintragen und für diese Key und Tokens generieren lassen; insbesondere einen Bearer-Token (Inhaber-Token).

Regeln

Mit dem Bearer-Token hat kann man die besprochenen Regeln anlegen, z.B. für den Zugriff auf Bitcoin-bezogene Tweets ohne assoziiertes Video.

Der Einfachheit halber zeigen wir dies anhand von Python. Das Bearer-Token muss dazu als Umgebungsvariable gesetzt werden.

import requests
from os import environ

bearer_token = environ.get("BEARER_TOKEN")

def bearer_oauth(r):
    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2FilteredStreamPython"
    return r

def set_rules():
    btc_rules = [
        {"value":
            "#BTC #Bitcoin -has:videos",
            "tag":
            "bitcoin tweets"}
    ]
    payload = {"add": btc_rules}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        auth=bearer_oauth,
        json=payload,
    )

Suchanfrage

Nachdem das Regelwerk definiert ist, kann man Suchanfragen absetzen. Beispiel: Suchergebnisse zusammen mit diversen Metadaten (Follower des Autors, Anzahl der Retweets, etc.) mittels cURL:

$ curl -H "Authorization: Bearer $BEARER_TOKEN" \
    https://api.twitter.com/2/tweets/search/stream? \
        tweet.fields=public_metrics&user.fields=public_metrics"

Verarbeitung und Analyse

Beispiele für die Verarbeitung und Analyse von Streams im Rahmen eines Projektes oder einer Schulung sind vielfältig. Beispiele:

  • Ein Kafka-Producer könnte den Stream einlesen, passend aufbereiten / filtern und anhand der Twitter Hashes (#Bitcoin, #Eth, #Ripple, etc.) die Nachrichten auf Partitionen seines Topics aufteilen.

  • Der Twitter-Stream könnte mit Kafka Streams aggregiert, gefiltert, aufbereitet, mit anderen Streams zusammengeführt und von elasticsearch konsumiert werden.

  • Ein MQTT-Client könnte den Stream auf passende MQTT-Topics aufteilen und zu seinem Broker publizieren. Andere MQTT-Clients könnten diese Topics dann abonnieren und passend verarbeiten.

Als konkretes Beispiel werden im Folgenden die Ergebnisse des Twitter-Streams (JSON) mittels Python in eine MongoDB (NoSQL Dokumenten-Datenbank) geschrieben.

Diese wird zusammen mit einer Verwaltungsoberfläche mit Docker Compose gestartet.

version: '3.1'

services:
  mongo:
    image: mongo
    restart: always
    ports:
      - "27017:27017"
    volumes:
      - configdb:/data/configdb
      - db:/data/db
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: s3cret

  mongo-express:
    image: mongo-express
    restart: always
    ports:
      - "8081:8081"
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: root
      ME_CONFIG_MONGODB_ADMINPASSWORD: s3cret
      ME_CONFIG_MONGODB_URL: mongodb://root:s3cret@mongo:27017/

volumes:
  configdb:
  db:
$ docker-compose up -d

Anschliessend wird der Stream in die Datenbank geschrieben. Auch hier der Einfachheit halber mittels Python und der Library pymongo.

def handle_stream(db):
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream" +
        "?tweet.fields=created_at,public_metrics,lang" +
        "&expansions=author_id" +
        "&user.fields=created_at,public_metrics",
        auth=bearer_oauth, stream=True
    )
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(
            "Cannot get stream (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )
    for response_line in response.iter_lines():
        if response_line:
            json_response = json.loads(response_line)
            result = db.tweets.insert_one(json_response)


def main():
    client = MongoClient("mongodb://root:s3cret@localhost:27017/")
    db = client.btc_data
    handle_stream(db)

Im Code wird ein MongoClient Objekt instantiert und die JSON Dokumente werden sukzessive in die Mongo-Collection (also in die "Tabelle") btc_data geschrieben.


Wir hoffen, dass wir Ihnen eine Idee bezüglich des praxisbezogenen Charakters unserer Schulungen gegeben haben.




Zu den Themen Kafka, Docker, MQTT (und vielen anderen) bieten wir sowohl Beratung, Entwicklungsunterstützung als auch passende Schulungen an:

Auch für Ihren individuellen Bedarf können wir Workshops und Schulungen anbieten. Sprechen Sie uns gerne an.

Feedback oder Fragen zu einem Artikel - per Twitter @triondevelop oder E-Mail freuen wir uns auf eine Kontaktaufnahme!

Zur Desktop Version des Artikels