Wednesday, 14 May 2025
  • My Feed
  • My Interests
  • My Saves
  • History
  • Blog
Subscribe
Capernaum
  • Finance
    • Cryptocurrency
    • Stock Market
    • Real Estate
  • Lifestyle
    • Travel
    • Fashion
    • Cook
  • Technology
    • AI
    • Data Science
    • Machine Learning
  • Health
    HealthShow More
    Skincare as You Age Infographic
    Skincare as You Age Infographic

    When I dove into the scientific research for my book How Not…

    By capernaum
    Treating Fatty Liver Disease with Diet 
    Treating Fatty Liver Disease with Diet 

    What are the three sources of liver fat in fatty liver disease,…

    By capernaum
    Bird Flu: Emergence, Dangers, and Preventive Measures

    In the United States in January 2025 alone, approximately 20 million commercially-raised…

    By capernaum
    Inhospitable Hospital Food 
    Inhospitable Hospital Food 

    What do hospitals have to say for themselves about serving meals that…

    By capernaum
    Gaming the System: Cardiologists, Heart Stents, and Upcoding 
    Gaming the System: Cardiologists, Heart Stents, and Upcoding 

    Cardiologists can criminally game the system by telling patients they have much…

    By capernaum
  • Sport
  • 🔥
  • Cryptocurrency
  • Data Science
  • Travel
  • Real Estate
  • AI
  • Technology
  • Machine Learning
  • Stock Market
  • Finance
  • Fashion
Font ResizerAa
CapernaumCapernaum
  • My Saves
  • My Interests
  • My Feed
  • History
  • Travel
  • Health
  • Technology
Search
  • Pages
    • Home
    • Blog Index
    • Contact Us
    • Search Page
    • 404 Page
  • Personalized
    • My Feed
    • My Saves
    • My Interests
    • History
  • Categories
    • Technology
    • Travel
    • Health
Have an existing account? Sign In
Follow US
© 2022 Foxiz News Network. Ruby Design Company. All Rights Reserved.
Home » Blog » A Code Implementation of a Real‑Time In‑Memory Sensor Alert Pipeline in Google Colab with FastStream, RabbitMQ, TestRabbitBroker, Pydantic
AI

A Code Implementation of a Real‑Time In‑Memory Sensor Alert Pipeline in Google Colab with FastStream, RabbitMQ, TestRabbitBroker, Pydantic

capernaum
Last updated: 2025-04-22 02:40
capernaum
Share
A Code Implementation of a Real‑Time In‑Memory Sensor Alert Pipeline in Google Colab with FastStream, RabbitMQ, TestRabbitBroker, Pydantic
SHARE

In this notebook, we demonstrate how to build a fully in-memory “sensor alert” pipeline in Google Colab using FastStream, a high-performance, Python-native stream processing framework, and its integration with RabbitMQ. By leveraging faststream.rabbit’s RabbitBroker and TestRabbitBroker, we simulate a message broker without needing external infrastructure. We orchestrate four distinct stages: ingestion & validation, normalization, monitoring & alert generation, and archiving, each defined as Pydantic models (RawSensorData, NormalizedData, AlertData) to ensure data quality and type safety. Under the hood, Python’s asyncio powers asynchronous message flow, while nest_asyncio enables nested event loops in Colab. We also employ the standard logging module for traceable pipeline execution and pandas for final result inspection, making it easy to visualize archived alerts in a DataFrame.

Copy CodeCopiedUse a different Browser
!pip install -q faststream[rabbit] nest_asyncio

We install FastStream with its RabbitMQ integration, providing the core stream-processing framework and broker connectors, as well as the nest_asyncio package, which enables nested asyncio event loops in environments like Colab. All this is achieved while keeping the output minimal with the -q flag.

Copy CodeCopiedUse a different Browser
import nest_asyncio, asyncio, logging
nest_asyncio.apply()

We import the nest_asyncio, asyncio, and logging modules, then apply nest_asyncio.apply() to patch Python’s event loop so that you can run nested asynchronous tasks inside environments like Colab or Jupyter notebooks without errors. The logging import readies you to instrument your pipeline with detailed runtime logs.

Copy CodeCopiedUse a different Browser
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("sensor_pipeline")

We configure Python’s built‑in logging to emit INFO‑level (and above) messages prefixed with a timestamp and severity, then create a dedicated logger named “sensor_pipeline” for emitting structured logs within your streaming pipeline.

Copy CodeCopiedUse a different Browser
from faststream import FastStream
from faststream.rabbit import RabbitBroker, TestRabbitBroker
from pydantic import BaseModel, Field, validator
import pandas as pd
from typing import List

We bring in FastStream’s core FastStream class alongside its RabbitMQ connectors (RabbitBroker for real brokers and TestRabbitBroker for in‑memory testing), Pydantic’s BaseModel, Field, and validator for declarative data validation, pandas for tabular result inspection, and Python’s List type for annotating our in‑memory archives.

Copy CodeCopiedUse a different Browser
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app    = FastStream(broker)

We instantiate a RabbitBroker pointed at a (local) RabbitMQ server using the AMQP URL, then create a FastStream application bound to that broker, setting up the messaging backbone for your pipeline stages.

Copy CodeCopiedUse a different Browser
class RawSensorData(BaseModel):
    sensor_id: str       = Field(..., examples=["sensor_1"])
    reading_celsius: float = Field(..., ge=-50, le=150, examples=[23.5])

    @validator("sensor_id")
    def must_start_with_sensor(cls, v):
        if not v.startswith("sensor_"):
            raise ValueError("sensor_id must start with 'sensor_'")
        return v


class NormalizedData(BaseModel):
    sensor_id: str
    reading_kelvin: float


class AlertData(BaseModel):
    sensor_id: str
    reading_kelvin: float
    alert: bool

These Pydantic models define the schema for each stage: RawSensorData enforces input validity (e.g., reading range and a sensor_ prefix), NormalizedData converts Celsius to Kelvin, and AlertData encapsulates the final alert payload (including a boolean flag), ensuring a type-safe data flow throughout the pipeline.

Copy CodeCopiedUse a different Browser
archive: List[AlertData] = []


@broker.subscriber("sensor_input")
@broker.publisher("normalized_input")
async def ingest_and_validate(raw: RawSensorData) -> dict:
    logger.info(f"Ingested raw data: {raw.json()}")
    return raw.dict()


@broker.subscriber("normalized_input")
@broker.publisher("sensor_alert")
async def normalize(data: dict) -> dict:
    norm = NormalizedData(
        sensor_id=data["sensor_id"],
        reading_kelvin=data["reading_celsius"] + 273.15
    )
    logger.info(f"Normalized to Kelvin: {norm.json()}")
    return norm.dict()


ALERT_THRESHOLD_K = 323.15

@broker.subscriber("sensor_alert")
@broker.publisher("archive_topic")
async def monitor(data: dict) -> dict:
    alert_flag = data["reading_kelvin"] > ALERT_THRESHOLD_K
    alert = AlertData(
        sensor_id=data["sensor_id"],
        reading_kelvin=data["reading_kelvin"],
        alert=alert_flag
    )
    logger.info(f"Monitor result: {alert.json()}")
    return alert.dict()


@broker.subscriber("archive_topic")
async def archive_data(payload: dict):
    rec = AlertData(**payload)
    archive.append(rec)
    logger.info(f"Archived: {rec.json()}")

An in-memory archive list collects all finalized alerts, while four asynchronous functions, wired via @broker.subscriber/@broker.publisher, form the pipeline stages. These functions ingest and validate raw sensor inputs, convert Celsius to Kelvin, check against an alert threshold, and finally archive each AlertData record, emitting logs at every step for full traceability.

Copy CodeCopiedUse a different Browser
async def main():
    readings = [
        {"sensor_id": "sensor_1", "reading_celsius": 45.2},
        {"sensor_id": "sensor_2", "reading_celsius": 75.1},
        {"sensor_id": "sensor_3", "reading_celsius": 50.0},
    ]
    async with TestRabbitBroker(broker) as tb:
        for r in readings:
            await tb.publish(r, "sensor_input")
        await asyncio.sleep(0.1)

    df = pd.DataFrame([a.dict() for a in archive])
    print("nFinal Archived Alerts:")
    display(df)


asyncio.run(main())

Finally, the main coroutine publishes a set of sample sensor readings into the in-memory TestRabbitBroker, pauses briefly to allow each pipeline stage to run, and then collates the resulting AlertData records from the archive into a pandas DataFrame for easy display and verification of the end-to-end alert flow. At the end, asyncio.run(main()) kicks off the entire async demo in Colab.

In conclusion, this tutorial demonstrates how FastStream, combined with RabbitMQ abstractions and in-memory testing via TestRabbitBroker, can accelerate the development of real-time data pipelines without the overhead of deploying external brokers. With Pydantic handling schema validation, asyncio managing concurrency, and pandas enabling quick data analysis, this pattern provides a robust foundation for sensor monitoring, ETL tasks, or event‑driven workflows. You can seamlessly transition from this in‑memory demo to production by swapping in a live broker URL (RabbitMQ, Kafka, NATS, or Redis) and running faststream run under uvicorn or your preferred ASGI server, unlocking scalable, maintainable stream processing in any Python environment.


Here is the Colab Notebook. Also, don’t forget to follow us on Twitter and join our Telegram Channel and LinkedIn Group. Don’t Forget to join our 90k+ ML SubReddit.

🔥 [Register Now] miniCON Virtual Conference on AGENTIC AI: FREE REGISTRATION + Certificate of Attendance + 4 Hour Short Event (May 21, 9 am- 1 pm PST) + Hands on Workshop

The post A Code Implementation of a Real‑Time In‑Memory Sensor Alert Pipeline in Google Colab with FastStream, RabbitMQ, TestRabbitBroker, Pydantic appeared first on MarkTechPost.

Share This Article
Twitter Email Copy Link Print
Previous Article Paul Atkins Officially Sworn In As SEC Chair, Here’s What To Expect Paul Atkins Officially Sworn In As SEC Chair, Here’s What To Expect
Next Article MANTRA Announces 300M OM Token Burn — Here’s What It Means for Price MANTRA Announces 300M OM Token Burn — Here’s What It Means for Price
Leave a comment

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Your Trusted Source for Accurate and Timely Updates!

Our commitment to accuracy, impartiality, and delivering breaking news as it happens has earned us the trust of a vast audience. Using RSS feeds, we aggregate news from trusted sources to ensure real-time updates on the latest events and trends. Stay ahead with timely, curated information designed to keep you informed and engaged.
TwitterFollow
TelegramFollow
LinkedInFollow
- Advertisement -
Ad imageAd image

You Might Also Like

Reinforcement Learning, Not Fine-Tuning: Nemotron-Tool-N1 Trains LLMs to Use Tools with Minimal Supervision and Maximum Generalization
AIMachine LearningTechnology

Reinforcement Learning, Not Fine-Tuning: Nemotron-Tool-N1 Trains LLMs to Use Tools with Minimal Supervision and Maximum Generalization

By capernaum
A Step-by-Step Guide to Deploy a Fully Integrated Firecrawl-Powered MCP Server on Claude Desktop with Smithery and VeryaX
AI

A Step-by-Step Guide to Deploy a Fully Integrated Firecrawl-Powered MCP Server on Claude Desktop with Smithery and VeryaX

By capernaum
Implementing an LLM Agent with Tool Access Using MCP-Use
AI

Implementing an LLM Agent with Tool Access Using MCP-Use

By capernaum
Google is ditching I’m Feeling Lucky for AI Search
AIData Science

Google is ditching I’m Feeling Lucky for AI Search

By capernaum
Capernaum
Facebook Twitter Youtube Rss Medium

Capernaum :  Your instant connection to breaking news & stories . Stay informed with real-time coverage across  AI ,Data Science , Finance, Fashion , Travel, Health. Your trusted source for 24/7 insights and updates.

© Capernaum 2024. All Rights Reserved.

CapernaumCapernaum
Welcome Back!

Sign in to your account

Lost your password?