Zum Hauptinhalt wechseln

Get started with Records

This guide demonstrates how to ingest, transform, and query high-volume OT event data using the Records service. It's an example pipeline that:

  1. Ingests raw data from source systems (OPC UA, PI AF)
  2. Transforms data into a standardized schema
  3. Queries records for analysis and visualization

For conceptual background, see Records and Streams.

Prerequisites

  1. Configure access control for your user or service account.
  2. Python environment with cognite-sdk installed.
Example: Configure capabilities
from cognite.client import CogniteClient
from cognite.client.data_classes import GroupWrite
from cognite.client.data_classes.capabilities import (
DataModelsAcl,
StreamRecordsAcl,
StreamsAcl,
)

client = CogniteClient()

my_group = client.iam.groups.create(
GroupWrite(
name="Records Access Group",
capabilities=[
DataModelsAcl(
actions=[
DataModelsAcl.Action.Read,
DataModelsAcl.Action.Write
],
scope=DataModelsAcl.Scope.All(),
),
StreamRecordsAcl(
actions=[
StreamRecordsAcl.Action.Read,
StreamRecordsAcl.Action.Write
],
scope=StreamRecordsAcl.Scope.All(),
),
StreamsAcl(
actions=[
StreamsAcl.Action.Read,
StreamsAcl.Action.Create,
StreamsAcl.Action.Delete,
],
scope=StreamsAcl.Scope.All(),
),
],
)
)

Example: SCADA alarm management pipeline

This example demonstrates a complete workflow for managing SCADA alarm events using Records. You'll create a mutable stream for live alarms, ingest and update alarm data, query it in various ways, and then archive acknowledged alarms to an immutable stream.

Step 1: Create a space and define the alarm schema

Create a space and container to define your alarm record structure.

Create a space

Spaces function as namespaces to organize your schemas and data. In this example, you'll create a single space for SCADA alarms that will be used by both streams and the container.

Example: Create a space
from cognite.client import CogniteClient
from cognite.client.data_classes.data_modeling import SpaceApply

client = CogniteClient()

space_id = "scada-alarms"
space = client.data_modeling.spaces.apply(
SpaceApply(
space=space_id,
description="SCADA alarm events",
name="SCADA Alarms",
)
)

Create a container

The container defines the properties of your alarm records. This example includes various property types that are common in SCADA systems: alarm identifiers, severity levels, numeric values, priorities, acknowledgment status, and timestamps.

Example: Create SCADA alarm container
from cognite.client import CogniteClient
from cognite.client.data_classes.data_modeling import (
ContainerApply,
ContainerProperty,
Text,
Float64,
Int64,
Boolean,
Timestamp,
)

client = CogniteClient()

container_id = "alarm_events"
container = client.data_modeling.containers.apply(
ContainerApply(
space="scada-alarms",
external_id=container_id,
name="SCADA Alarm Events",
description="Container for SCADA alarm event data",
used_for="record",
properties={
# Text property - alarm identifier
"alarm_id": ContainerProperty(
type=Text(is_list=False),
nullable=False,
name="Alarm ID",
description="Unique identifier for the alarm",
),
# Text property - alarm severity
"severity": ContainerProperty(
type=Text(is_list=False),
nullable=False,
name="Severity",
description="Alarm severity level (CRITICAL, HIGH, MEDIUM, LOW)",
),
# Float property - alarm value
"value": ContainerProperty(
type=Float64(is_list=False),
nullable=False,
name="Value",
description="Numeric value associated with the alarm condition",
),
# Int property - priority score
"priority": ContainerProperty(
type=Int64(is_list=False),
nullable=False,
name="Priority",
description="Priority score (1-100)",
),
# Boolean property - acknowledged status
"is_acknowledged": ContainerProperty(
type=Boolean(is_list=False),
nullable=False,
name="Acknowledged Status",
description="True if alarm has been acknowledged",
),
# Timestamp property - when alarm occurred
"timestamp": ContainerProperty(
type=Timestamp(is_list=False),
nullable=False,
name="Timestamp",
description="Time when the alarm occurred",
),
},
)
)

Step 2: Create streams for different lifecycles

With the schema defined, you can now create streams to hold the records. When creating a stream, you must select a Stream Settings Template. These templates are pre-configured profiles that determine the stream's behavior, performance, and lifecycle policies.

In this example, you'll create two streams:

  1. Mutable stream: For active alarms that need updates (e.g., acknowledgment status changes)
  2. Immutable stream: For archived alarms that never change

Choose your stream template based on data lifecycle and throughput requirements, not by record type or source system. For detailed specifications of each template's limits, see the Streams API documentation.

Example: Create a mutable stream
from cognite.client import CogniteClient
import time

client = CogniteClient()

mutable_stream_id = "alarm_data_live_stream"
response = client.post(
url=f"/api/v1/projects/{client.config.project}/streams",
json={
"items": [
{
"externalId": mutable_stream_id,
"settings": {"template": {"name": "BasicLiveData"}},
}
]
},
headers={"cdf-version": "beta"},
)
Example: Create an immutable stream
from cognite.client import CogniteClient

client = CogniteClient()

immutable_stream_id = "alarm_data_archive_stream"
response = client.post(
url=f"/api/v1/projects/{client.config.project}/streams",
json={
"items": [
{
"externalId": immutable_stream_id,
"settings": {"template": {"name": "ImmutableTestStream"}},
}
]
},
headers={"cdf-version": "beta"},
)

Step 3: Ingest records

Data is ingested directly into a stream, typically via a Cognite Extractor (such as the OPC UA Extractor or a Hosted Extractor) that reads from a source system. When writing records to the API, you must specify the target stream (identifier), and include the container's space.externalId to identify the schema for the data being ingested.

Vorsicht

The Records service does not support onboarding data using the traditional CDF pipeline of source system → RAW → Transformations. The Records service has a highly optimized ingestion path that bypasses these services. Using RAW and Transformations for Records data introduces unnecessary latency, cost, and complexity. Always ingest data directly to a Records stream using one of the extractors with Records support.

Extractors with Records support

The following extractors currently have support for writing directly to Records:

  • OPC UA Extractor (Immutable stream support)
  • PI AF Extractor
  • Hosted REST Extractor

Example: Ingest alarm records

Let's ingest some SCADA alarm records to the mutable stream. We'll start with 2 hardcoded records, then add 100 random records to simulate a realistic alarm dataset. In a production system, this would typically be done by an extractor, but for this example we'll use the API directly.

Example: Ingest hardcoded alarm records
from cognite.client import CogniteClient
import uuid
import time

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Create two hardcoded alarm records
hardcoded_records = [
{
"alarm_id": "ALARM-PRESS-001",
"severity": "CRITICAL",
"value": 125.8,
"priority": 95,
"is_acknowledged": False,
"timestamp": "2025-10-22T10:00:00.000Z",
},
{
"alarm_id": "ALARM-TEMP-042",
"severity": "MEDIUM",
"value": 88.3,
"priority": 60,
"is_acknowledged": False,
"timestamp": "2025-10-22T10:00:00.000Z",
},
]

# Prepare records for ingestion
items = []
for record in hardcoded_records:
ext_id = str(uuid.uuid4())
items.append(
{
"space": space_id,
"externalId": ext_id,
"sources": [
{
"source": {
"type": "container",
"space": space_id,
"externalId": container_id,
},
"properties": record,
}
],
}
)

# Ingest hardcoded records
response = client.post(
url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records",
json={"items": items},
headers={"cdf-version": "beta"},
)
Example: Ingest random alarm records for realistic dataset
from cognite.client import CogniteClient
from datetime import datetime, timedelta
import random
import uuid
import time

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Generate 100 random alarm records
alarm_ids = [
"ALARM-PRESS-001",
"ALARM-TEMP-042",
"ALARM-FLOW-013",
"ALARM-VIBR-088",
"ALARM-LEVEL-055",
]
severities = ["CRITICAL", "HIGH", "MEDIUM", "LOW"]
base_time = datetime.now() - timedelta(hours=2)

random_records = [
{
"alarm_id": random.choice(alarm_ids),
"severity": random.choice(severities),
"value": round(random.uniform(50.0, 150.0), 2),
"priority": random.randint(20, 100),
"is_acknowledged": random.choice([True, False]),
"timestamp": (base_time + timedelta(minutes=i)).strftime("%Y-%m-%dT%H:%M:%S.000Z"),
}
for i in range(100)
]

# Prepare random records for ingestion
items = []
for record in random_records:
items.append(
{
"space": space_id,
"externalId": str(uuid.uuid4()),
"sources": [
{
"source": {
"type": "container",
"space": space_id,
"externalId": container_id,
},
"properties": record,
}
],
}
)

# Ingest random records
response = client.post(
url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records",
json={"items": items},
headers={"cdf-version": "beta"},
)

Step 4: Update records (mutable streams only)

One of the key benefits of mutable streams is the ability to update records. This is useful for scenarios like acknowledging alarms, updating status fields, or correcting data. Use the upsert endpoint to update existing records.

Example: Update an alarm to acknowledged status
from cognite.client import CogniteClient
import time

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Assuming you have the external_id of a record you want to update
# In practice, you would get this from a previous query or store it during ingestion
record_external_id = "your-record-external-id" # Replace with actual ID

# Update the record to set is_acknowledged to True
updated_record = {
"alarm_id": "ALARM-TEMP-042",
"severity": "MEDIUM",
"value": 88.3,
"priority": 60,
"is_acknowledged": True,
"timestamp": "2025-10-22T10:00:00.000Z",
}

response = client.post(
url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/upsert",
json={
"items": [
{
"space": space_id,
"externalId": record_external_id,
"sources": [
{
"source": {
"type": "container",
"space": space_id,
"externalId": container_id,
},
"properties": updated_record,
}
],
}
]
},
headers={"cdf-version": "alpha"},
)

Step 5: Query records

The Records API provides three primary endpoints for consuming data, each tailored for a different use case. The query syntax for filtering is similar for all endpoints.

Filter - Interactive queries

Use the filter endpoint for interactive applications, dashboards, and any scenario where you expect a relatively small, bounded result set.

  • Endpoint: /stream/{streamId}/records/filter
  • Behavior: Returns an unpaginated result set.
  • Limits: Maximum 1,000 records per request (default: 10).
  • Sorting: Supports custom sorting on any property.

This endpoint is ideal for queries like "show me the top 10 unacknowledged alarms sorted by priority."

Example: Filter unacknowledged alarms by priority
from cognite.client import CogniteClient
from datetime import datetime, timedelta
from tabulate import tabulate

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Filter for unacknowledged alarms, sorted by priority descending
response = client.post(
url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/filter",
json={
"lastUpdatedTime": {
"gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
},
"sources": [
{
"source": {
"type": "container",
"space": space_id,
"externalId": container_id,
},
"properties": ["*"],
}
],
"filter": {
"equals": {
"property": [space_id, container_id, "is_acknowledged"],
"value": False,
}
},
"sort": [
{
"property": [space_id, container_id, "priority"],
"direction": "descending",
}
],
"limit": 10,
},
headers={"cdf-version": "beta"},
)

results = response.json()
items = results.get("items", [])

# Display as table
table_data = []
for item in items[:5]: # Show first 5
props = item["properties"][space_id][container_id]
table_data.append([
props["alarm_id"],
props["severity"],
props["value"],
props["priority"],
props["timestamp"],
])

print(f"Found {len(items)} unacknowledged alarms (showing first 5):")
print(tabulate(
table_data,
headers=["Alarm ID", "Severity", "Value", "Priority", "Timestamp"],
tablefmt="github",
))

Expected output:

Found 10 unacknowledged alarms (showing first 5):
| Alarm ID | Severity | Value | Priority | Timestamp |
|-----------------|------------|---------|------------|--------------------------|
| ALARM-TEMP-042 | CRITICAL | 98.28 | 100 | 2025-10-24T12:04:54.000Z |
| ALARM-TEMP-042 | LOW | 71.82 | 100 | 2025-10-24T12:44:54.000Z |
| ALARM-LEVEL-055 | LOW | 69.19 | 99 | 2025-10-24T12:55:54.000Z |
| ALARM-FLOW-013 | LOW | 56.17 | 97 | 2025-10-24T12:50:54.000Z |
| ALARM-FLOW-013 | HIGH | 131.5 | 97 | 2025-10-24T11:45:54.000Z |

Sync - Large datasets

Use the sync endpoint for batch processing, data exports, or any application that needs to retrieve a large number of records efficiently.

  • Endpoint: /stream/{streamId}/records/sync
  • Behavior: Returns a paginated result set. You iterate through the data by making subsequent requests with the cursor returned in the previous response.
  • Limits: Maximum 1,000 records per page. The sync process is complete when the API response has hasNext set to false.
  • Sorting: The sync endpoint does not support custom sorting.

This endpoint is designed for workflows like "export all recent alarm events for analysis" or implementing incremental data pipelines.

The read throughput and the maximum time range you can query are governed by the Stream Settings Template you chose in Step 2.

Example: Sync recent alarm records
from cognite.client import CogniteClient
from tabulate import tabulate

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Sync records from the last 2 days
response = client.post(
url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/sync",
json={
"sources": [
{
"source": {
"type": "container",
"space": space_id,
"externalId": container_id,
},
"properties": ["*"],
}
],
"initializeCursor": "2d-ago",
"limit": 10,
},
headers={"cdf-version": "beta"},
)

results = response.json()
items = results.get("items", [])

# Display as table
table_data = []
for item in items[:5]: # Show first 5
props = item["properties"][space_id][container_id]
ack_status = "ACK" if props["is_acknowledged"] else "UNACK"
table_data.append([
props["alarm_id"],
props["severity"],
ack_status,
props["value"],
props["timestamp"],
])

print(f"Sync returned {len(items)} records (showing first 5):")
print(tabulate(
table_data,
headers=["Alarm ID", "Severity", "Status", "Value", "Timestamp"],
tablefmt="github",
))

if "nextCursor" in results:
cursor = results["nextCursor"]
cursor_preview = f"{cursor[:5]}..." if len(cursor) > 5 else cursor
print(f"\nNext cursor for incremental sync: {cursor_preview}")

Expected output:

Sync returned 10 records (showing first 5):
| Alarm ID | Severity | Status | Value | Timestamp |
|-----------------|------------|----------|---------|--------------------------|
| ALARM-PRESS-001 | CRITICAL | UNACK | 125.8 | 2025-10-22T10:00:00.000Z |
| ALARM-TEMP-042 | LOW | UNACK | 98.78 | 2025-10-24T11:27:54.000Z |
| ALARM-VIBR-088 | CRITICAL | ACK | 122.78 | 2025-10-24T11:28:54.000Z |
| ALARM-VIBR-088 | MEDIUM | ACK | 140.56 | 2025-10-24T11:29:54.000Z |
| ALARM-LEVEL-055 | CRITICAL | ACK | 50.59 | 2025-10-24T11:30:54.000Z |

Next cursor for incremental sync: 08c0e...

Aggregate - Statistical analysis

Use the aggregate endpoint to perform statistical analysis on your records, such as counts, averages, sums, and other aggregations.

  • Endpoint: /stream/{streamId}/records/aggregate
  • Behavior: Returns aggregated results based on the specified aggregation functions and groupings.
  • Use Cases: Ideal for generating reports like "count of alarms by severity" or "average alarm value and priority statistics."

This endpoint is optimized for analytical queries that require summarizing large datasets without retrieving individual records.

Example: Overall alarm statistics
from cognite.client import CogniteClient
from datetime import datetime, timedelta

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Calculate overall statistics: count, average value, min/max values, average priority
response = client.post(
url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/aggregate",
json={
"lastUpdatedTime": {
"gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
},
"aggregates": {
"total_count": {"count": {}},
"avg_value": {"avg": {"property": [space_id, container_id, "value"]}},
"min_value": {"min": {"property": [space_id, container_id, "value"]}},
"max_value": {"max": {"property": [space_id, container_id, "value"]}},
"avg_priority": {"avg": {"property": [space_id, container_id, "priority"]}},
},
},
headers={"cdf-version": "beta"},
)

results = response.json()
aggs = results["aggregates"]

print("Overall Statistics:")
print(f" Total Records: {aggs['total_count']['count']}")
print(f" Average Value: {aggs['avg_value']['avg']:.2f}")
print(f" Min Value: {aggs['min_value']['min']:.2f}")
print(f" Max Value: {aggs['max_value']['max']:.2f}")
print(f" Average Priority: {aggs['avg_priority']['avg']:.1f}")

Expected output:

Overall Statistics:
Total Records: 102
Average Value: 95.87
Min Value: 50.15
Max Value: 149.39
Average Priority: 63.0
Example: Per-alarm statistics with nested aggregations
from cognite.client import CogniteClient
from datetime import datetime, timedelta
from tabulate import tabulate

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"

# Group by alarm_id and calculate statistics for each alarm
response = client.post(
url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/aggregate",
json={
"lastUpdatedTime": {
"gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
},
"aggregates": {
"by_alarm": {
"uniqueValues": {
"property": [space_id, container_id, "alarm_id"],
"aggregates": {
"avg_value": {
"avg": {"property": [space_id, container_id, "value"]}
},
"by_severity": {
"uniqueValues": {
"property": [space_id, container_id, "severity"]
}
},
},
}
}
},
},
headers={"cdf-version": "beta"},
)

results = response.json()
buckets = results["aggregates"]["by_alarm"]["uniqueValueBuckets"]

print("Per-Alarm Statistics:")
table_data = []
for bucket in buckets:
alarm = bucket["value"]
count = bucket["count"]
avg_value = bucket["aggregates"]["avg_value"]["avg"]

# Get most common severity from nested uniqueValues buckets
severity_buckets = bucket["aggregates"]["by_severity"]["uniqueValueBuckets"]
most_common_severity = (
max(severity_buckets, key=lambda x: x["count"])["value"]
if severity_buckets
else "N/A"
)

table_data.append([alarm, count, f"{avg_value:.2f}", most_common_severity])

print(tabulate(
table_data,
headers=["Alarm ID", "Occurrences", "Avg Value", "Most Common Severity"],
tablefmt="github",
))

Expected output:

Per-Alarm Statistics:
| Alarm ID | Occurrences | Avg Value | Most Common Severity |
|-----------------|---------------|-------------|------------------------|
| ALARM-FLOW-013 | 26 | 91.50 | LOW |
| ALARM-TEMP-042 | 24 | 92.15 | CRITICAL |
| ALARM-PRESS-001 | 19 | 104.62 | CRITICAL |
| ALARM-VIBR-088 | 17 | 106.93 | LOW |
| ALARM-LEVEL-055 | 16 | 86.39 | CRITICAL |

Step 6: Archive acknowledged alarms (stream-to-stream pattern)

A powerful pattern for Records is moving data between streams based on lifecycle stages. In this example, we'll archive acknowledged alarms by moving them from the mutable stream to the immutable stream. This is a common pattern for:

  • Hot/cold storage: Active data in mutable streams, archived data in immutable streams
  • Data lifecycle management: Moving processed records to long-term storage
  • Performance optimization: Keeping active streams lean by moving old data to archives

The key to reliable stream-to-stream operations is using the sync endpoint with cursor-based pagination. The cursor tracks your position in the stream, allowing you to resume from where you left off if processing fails.

Example: Move acknowledged alarms with pagination
from cognite.client import CogniteClient
import time

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"
immutable_stream_id = "alarm_data_archive_stream"

# In a production system, you would persist this cursor between runs
stored_cursor = None

# Pagination settings
BATCH_SIZE = 10
total_records_moved = 0
batch_number = 0

print("Starting pagination loop to archive acknowledged alarms...")

# Keep paginating until there are no more acknowledged records
while True:
batch_number += 1
print(f"\n--- Batch {batch_number} ---")

# Sync acknowledged records from MUTABLE stream
sync_response = client.post(
url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/sync",
json={
"sources": [
{
"source": {
"type": "container",
"space": space_id,
"externalId": container_id,
},
"properties": ["*"],
}
],
# Use stored cursor if available, otherwise initialize from 2 days ago
**(
{"cursor": stored_cursor}
if stored_cursor
else {"initializeCursor": "2d-ago"}
),
"filter": {
"equals": {
"property": [space_id, container_id, "is_acknowledged"],
"value": True,
}
},
"limit": BATCH_SIZE,
},
headers={"cdf-version": "beta"},
)

sync_results = sync_response.json()
acknowledged_records = sync_results.get("items", [])
next_cursor = sync_results["nextCursor"]
has_next = sync_results["hasNext"]

print(f"Found {len(acknowledged_records)} acknowledged records in this batch")
print(f"More data immediately available: {has_next}")

# If no records found, we're done
if len(acknowledged_records) == 0:
print("No more acknowledged records to move")
stored_cursor = next_cursor # Update cursor even when no records found
break

# Prepare records for ingestion into immutable stream
print(f"Ingesting {len(acknowledged_records)} records to IMMUTABLE stream...")

immutable_items = []
records_to_delete = []

for record in acknowledged_records:
# Extract properties and identifiers
record_space = record["space"]
record_external_id = record["externalId"]
props = record["properties"][space_id][container_id]

# Store record identifier for deletion
records_to_delete.append({
"space": record_space,
"externalId": record_external_id,
})

# Prepare for immutable stream
immutable_items.append({
"space": record_space,
"externalId": record_external_id,
"sources": [
{
"source": {
"type": "container",
"space": space_id,
"externalId": container_id,
},
"properties": props,
}
],
})

# Ingest into immutable stream
immutable_res = client.post(
url=f"/api/v1/projects/{client.config.project}/streams/{immutable_stream_id}/records",
json={"items": immutable_items},
headers={"cdf-version": "beta"},
)

# Check if ingest was successful (200/201: sync success, 202: async processing)
if immutable_res.status_code not in [200, 201, 202]:
print(f"Warning: Immutable ingest returned status {immutable_res.status_code}")
print(f"Response: {immutable_res.text}")
break
else:
print(f"✓ Ingested {len(immutable_items)} records to IMMUTABLE stream")

# Delete acknowledged records from MUTABLE stream
print(f"Deleting {len(records_to_delete)} records from MUTABLE stream...")

delete_res = client.post(
url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/delete",
json={"items": records_to_delete},
headers={"cdf-version": "alpha"},
)
print(f"✓ Deleted {len(records_to_delete)} records from MUTABLE stream")

# IMPORTANT: Only update cursor after BOTH operations succeed
stored_cursor = next_cursor
total_records_moved += len(acknowledged_records)

cursor_preview = f"{stored_cursor[:20]}..." if len(stored_cursor) > 20 else stored_cursor
print(f"✓ Cursor updated: {cursor_preview}")
print(f"Total records moved so far: {total_records_moved}")

print(f"\n{'=' * 70}")
print("PAGINATION COMPLETE")
print(f"{'=' * 70}")
print(f"Total batches processed: {batch_number}")
print(f"Total records moved: {total_records_moved}")
print(f"{'=' * 70}")

Expected output:

Starting pagination loop to archive acknowledged alarms...

--- Batch 1 ---
Found 10 acknowledged records in this batch
More data immediately available: True
Ingesting 10 records to IMMUTABLE stream...
✓ Ingested 10 records to IMMUTABLE stream
Deleting 10 records from MUTABLE stream...
✓ Deleted 10 records from MUTABLE stream
✓ Cursor updated: 08c0b89cecbeb1dab818...
Total records moved so far: 10

--- Batch 2 ---
Found 10 acknowledged records in this batch
More data immediately available: True
Ingesting 10 records to IMMUTABLE stream...
✓ Ingested 10 records to IMMUTABLE stream
Deleting 10 records from MUTABLE stream...
✓ Deleted 10 records from MUTABLE stream
✓ Cursor updated: 08c0aae0c3c0b1dab818...
Total records moved so far: 20

--- Batch 3 ---
Found 10 acknowledged records in this batch
More data immediately available: True
Ingesting 10 records to IMMUTABLE stream...
✓ Ingested 10 records to IMMUTABLE stream
Deleting 10 records from MUTABLE stream...
✓ Deleted 10 records from MUTABLE stream
✓ Cursor updated: 08c0e5b4cdc4b1dab818...
Total records moved so far: 30

[... batches 4-6 similar, each moving 10 records ...]

--- Batch 7 ---
Found 4 acknowledged records in this batch
More data immediately available: False
Ingesting 4 records to IMMUTABLE stream...
✓ Ingested 4 records to IMMUTABLE stream
Deleting 4 records from MUTABLE stream...
✓ Deleted 4 records from MUTABLE stream
✓ Cursor updated: 0880beeae1c9b1dab818...
Total records moved so far: 64

--- Batch 8 ---
Found 0 acknowledged records in this batch
More data immediately available: False
No more acknowledged records to move

======================================================================
PAGINATION COMPLETE
======================================================================
Total batches processed: 8
Total records moved: 64
======================================================================
Example: Verify final distribution across streams
from cognite.client import CogniteClient
from datetime import datetime, timedelta

client = CogniteClient()

space_id = "scada-alarms"
container_id = "alarm_events"
mutable_stream_id = "alarm_data_live_stream"
immutable_stream_id = "alarm_data_archive_stream"

# Query mutable stream for distribution
mutable_response = client.post(
url=f"/api/v1/projects/{client.config.project}/streams/{mutable_stream_id}/records/aggregate",
json={
"lastUpdatedTime": {
"gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
},
"aggregates": {
"total_count": {"count": {}},
"by_acknowledged": {
"uniqueValues": {
"property": [space_id, container_id, "is_acknowledged"]
}
},
},
},
headers={"cdf-version": "beta"},
)

mutable_results = mutable_response.json()
print("MUTABLE Stream Statistics:")
print(f" Total Records: {mutable_results['aggregates']['total_count']['count']}")

mutable_buckets = mutable_results["aggregates"]["by_acknowledged"]["uniqueValueBuckets"]
for bucket in mutable_buckets:
is_ack = bucket["value"] == "true" if isinstance(bucket["value"], str) else bucket["value"]
ack_status = "Acknowledged (True)" if is_ack else "Unacknowledged (False)"
print(f" {ack_status}: {bucket['count']} records")

# Query immutable stream for distribution
immutable_response = client.post(
url=f"/api/v1/projects/{client.config.project}/streams/{immutable_stream_id}/records/aggregate",
json={
"lastUpdatedTime": {
"gt": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.00Z"),
},
"aggregates": {
"total_count": {"count": {}},
"by_acknowledged": {
"uniqueValues": {
"property": [space_id, container_id, "is_acknowledged"]
}
},
},
},
headers={"cdf-version": "beta"},
)

immutable_results = immutable_response.json()
print("\nIMMUTABLE Stream Statistics:")
print(f" Total Records: {immutable_results['aggregates']['total_count']['count']}")

immutable_buckets = immutable_results["aggregates"]["by_acknowledged"]["uniqueValueBuckets"]
for bucket in immutable_buckets:
is_ack = bucket["value"] == "true" if isinstance(bucket["value"], str) else bucket["value"]
ack_status = "Acknowledged (True)" if is_ack else "Unacknowledged (False)"
print(f" {ack_status}: {bucket['count']} records")

Expected output:

MUTABLE Stream Statistics:
Total Records: 38
Unacknowledged (False): 38 records

IMMUTABLE Stream Statistics:
Total Records: 64
Acknowledged (True): 64 records

This confirms that all 64 acknowledged alarms have been successfully moved from the mutable stream (which now contains only the remaining 38 unacknowledged alarms) to the immutable archive stream. Together, this accounts for the original 102 total records (38 + 64 = 102).

Advanced: Stream-to-stream transformations

A powerful architectural pattern for Records is to create data pipelines that move and transform data between streams. This allows you to separate the concerns of raw data ingestion from the creation of clean, standardized datasets for consumption.

A common workflow is:

  1. Ingest to staging: Configure an extractor to write raw data from a source system into an ImmutableDataStaging stream. This stream should contain an exact, untransformed copy of the source data, preserving the original schema in the source for the data.

  2. Transform and standardize: Use the Hosted REST Extractor to create a stream-to-stream pipeline.

    • Configure the extractor to continuously read from the staging stream's /sync endpoint.

    • Use the extractor's built-in mapping capabilities to perform lightweight transformations.

    • This is where you would implement the "Promoted Properties" pattern, mapping source fields to your standardized UnifiedEvent container.

    • Write the transformed, standardized records to, for instance, a permanent ImmutableArchive stream.

  3. Consume from archive: Point your applications, dashboards, and analytical jobs to the clean, standardized ImmutableArchive or ImmutableNormalizedData stream, for reliable high-performance consumption.

This composable architecture lets you ingest data once in its raw format for fidelity with improved data-trust, and then create streams containing standardized, use-case-specific record schemas from the source stream, building a robust and maintainable data processing ecosystem within CDF for your high volume OT event/log data.

Example transformation mapping

The mapping defines how to transform each input record from the source stream into the output format for the destination stream. The mapping receives each record as input and must produce an output object with type: "immutable_record" along with the target space, stream, and container sources.

Example: Transformation mapping for Hosted REST Extractor
{
"space": "my_space",
"externalId": concat("record_", input.id, "_", now()),
"stream": "my_stream_id",
"sources": [
{
"source": {
"space": "schema_space",
"externalId": "my_container"
},
"properties": {
"value": input.temperature,
"timestamp": to_unix_timestamp(input.datetime, "%Y-%m-%dT%H:%M:%S"),
"sensorId": input.sensor.id,
"location": input.sensor.location
}
}
],
"type": "immutable_record"
}
Hinweis

The Hosted REST Extractor expects the space, container and stream to have been created before the mapping can be successfully applied by the extractor job.