Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 332 additions & 8 deletions docs/quix-cloud/managed-services/dynamic-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,338 @@ This service can leverage a blob storage configured on our platform (see [blob s

The blob storage configuration is automatically injected only when `contentStore` is set to `file`.

## SDK Integration
## API Reference

The **Quix Streams SDK** provides built-in functionality to:
The Dynamic Configuration Manager provides a REST API for managing configurations. The API is available at the service endpoint once deployed.

- Subscribe to configuration change events.
- Download and cache the latest configuration from the API.
- Join configuration values with live data streams at the right time.
### Base URL
```
http://<service-name>:<port>/api/v1
```

### Authentication
All API requests require authentication via the `Authorization` header:
```
Authorization: Bearer <your-token>
```

### Create Configuration

Create a new configuration:

```http
POST /api/v1/configurations
Content-Type: application/json

{
"metadata": {
"type": "device-config",
"target_key": "sensor-001",
"valid_from": "2024-01-01T00:00:00Z",
"category": "sensors"
},
"content": {
"device": {
"name": "Temperature Sensor 001",
"model": "TS-2000",
"location": "Building A, Floor 2"
},
"calibration": {
"offset": 0.5,
"scale": 1.02,
"last_calibrated": "2024-01-01T00:00:00Z"
},
"firmware": {
"version": "2.1.3",
"features": ["temperature", "humidity"]
}
},
"replace": false
}
```

**Request Body:**
- `metadata.type` (string, required): Configuration type identifier
- `metadata.target_key` (string, required): Target key for configuration matching
- `metadata.valid_from` (ISO8601 datetime, optional): When this configuration becomes valid
- `metadata.category` (string, optional): Category for grouping configurations
- `content` (object, optional): The actual configuration data (JSON object)
- `replace` (boolean, optional): If true, creates a new version if configuration already exists (default: false)

**Note:** The configuration `id` is automatically generated from `type` and `target_key` using SHA-1 hash.

### Update Configuration

Update an existing configuration (creates a new version):

```http
PUT /api/v1/configurations/{id}
Content-Type: application/json

{
"metadata": {
"valid_from": "2024-01-15T00:00:00Z",
"category": "sensors"
},
"content": {
"device": {
"name": "Temperature Sensor 001",
"model": "TS-2000",
"location": "Building A, Floor 3"
},
"calibration": {
"offset": 0.3,
"scale": 1.01,
"last_calibrated": "2024-01-15T10:30:00Z"
}
}
}
```

**Request Body:**
- `metadata.valid_from` (ISO8601 datetime, optional): Update when this configuration becomes valid
- `metadata.category` (string, optional): Update the category
- `content` (object, optional): Updated configuration data

**Note:** Only fields you provide will be updated. Omitted fields remain unchanged.

### Upload Binary Configuration

For non-JSON configurations (firmware files, calibration data, etc.), use the file upload endpoint:

```http
POST /api/v1/configurations/{id}/versions/{version}/content
Content-Type: multipart/form-data

file: <binary-file-data>
```

**Note:** Binary content must be uploaded separately after creating the configuration metadata. The service automatically detects and stores binary content with appropriate `content_type`.

### Search Configurations

Search for configurations using various criteria:

```http
GET /api/v1/configurations/search?type=device-config&target_key=sensor-001&limit=10&offset=0
```

**Query Parameters:**
- `type` (string, optional): Filter by configuration type
- `target_key` (string, optional): Filter by target key
- `category` (string, optional): Filter by category
- `limit` (integer, optional): Maximum number of results (default: 20)
- `offset` (integer, optional): Number of results to skip (default: 0)

### Get Configuration

Retrieve a specific configuration:

```http
GET /api/v1/configurations/{id}
```

### Get Configuration Version

Retrieve a specific version of a configuration:

```http
GET /api/v1/configurations/{id}/versions/{version}
```

### Delete Configuration

Delete a configuration (all versions):

```http
DELETE /api/v1/configurations/{id}
```

### Storage Modes

#### MongoDB Mode (Default)
- **Content Type**: JSON only
- **Size Limit**: 16 MB per configuration
- **Use Case**: Structured configuration data
- **Setup**: Configure MongoDB connection parameters

#### File Mode (Blob Storage)
- **Content Type**: Any binary data
- **Size Limit**: Depends on blob storage provider
- **Use Case**: Large files, firmware, binary data
- **Setup**: Configure blob storage credentials

To use file mode, set `contentStore: file` in your deployment configuration.

## Using with Quix Streams join_lookup

The Dynamic Configuration Manager integrates seamlessly with Quix Streams' `join_lookup` feature to enrich streaming data with configuration data in real-time.

### Basic Integration

```python
from quixstreams import Application
from quixstreams.dataframe.joins.lookups import QuixConfigurationService

# Initialize the application
app = Application()

# Create a lookup instance pointing to your configuration topic
lookup = QuixConfigurationService(
topic=app.topic("device-configurations"),
app_config=app.config
)

# Create your main data stream
sdf = app.dataframe(app.topic("sensor-data"))

# Enrich sensor data with device configuration
sdf = sdf.join_lookup(
lookup=lookup,
fields={
"device_name": lookup.json_field(
jsonpath="$.device.name",
type="device-config"
),
"calibration_params": lookup.json_field(
jsonpath="$.calibration",
type="device-config"
),
"firmware_version": lookup.json_field(
jsonpath="$.firmware.version",
type="device-config"
)
},
on="device_id" # The field to match on
)

# Process the enriched data
sdf = sdf.apply(lambda value: {
**value,
"device_info": f"{value['device_name']} (v{value['firmware_version']})"
})

# Output to destination topic
sdf.to_topic(app.topic("enriched-sensor-data"))

if __name__ == "__main__":
app.run()
```

### Advanced Configuration Matching

Use custom key matching logic for complex scenarios:

```python
def custom_key_matcher(value, key):
"""Custom logic to determine configuration key"""
device_type = value.get("device_type", "unknown")
location = value.get("location", "default")
return f"{device_type}-{location}"

# Use custom key matching
sdf = sdf.join_lookup(
lookup=lookup,
fields={
"config": lookup.json_field(
jsonpath="$",
type="location-config"
)
},
on=custom_key_matcher
)
```

### Binary Configuration Support

For non-JSON configurations (firmware files, calibration data, etc.):

```python
sdf = sdf.join_lookup(
lookup=lookup,
fields={
"firmware_binary": lookup.bytes_field(
type="firmware"
),
"calibration_data": lookup.bytes_field(
type="calibration"
)
},
on="device_id"
)
```

### How join_lookup Works with Dynamic Configuration

1. **Configuration Events**: When configurations are updated via the API, lightweight Kafka events are published to your configuration topic.

2. **Real-time Processing**: The `join_lookup` feature listens to these events, fetches the latest configuration content, and caches it locally.

3. **Stream Enrichment**: As your main data stream processes records, `join_lookup` automatically enriches each record with the appropriate configuration data based on the matching key and timestamp.

4. **Version Management**: The system automatically handles configuration versioning, ensuring that each record is enriched with the configuration version that was valid at the time the record was created.

5. **Performance Optimization**: Local caching minimizes API calls and reduces latency for high-throughput applications.

### Advanced Use Cases

#### Custom Target Key Matching

For complex matching logic that goes beyond simple field matching:

```python
class CustomLookup(QuixConfigurationService):
def _config_id(self, type: str, key: str) -> str:
"""Override to implement custom ID generation"""
# Custom logic for generating configuration IDs
if type == "device-config":
# Extract device type and location from key
parts = key.split("-")
device_type = parts[0]
location = parts[1] if len(parts) > 1 else "default"
return f"{device_type}-{location}"
return super()._config_id(type, key)

# Use custom lookup
custom_lookup = CustomLookup(
topic=app.topic("device-configurations"),
app_config=app.config
)
```

#### Multi-Type Configuration Enrichment

Enrich with multiple configuration types:

```python
sdf = sdf.join_lookup(
lookup=lookup,
fields={
# Device configuration
"device_info": lookup.json_field(
jsonpath="$.device",
type="device-config"
),
# Location configuration
"location_info": lookup.json_field(
jsonpath="$.location",
type="location-config"
),
# Calibration data
"calibration": lookup.json_field(
jsonpath="$",
type="calibration-config"
)
},
on="device_id"
)
```

### Benefits

👉 See [SDK
documentation](../../quix-streams/api-reference/dataframe.html#streamingdataframejoin_lookup)
for details on `join_lookup` and related features.
- **Real-time Updates**: Configuration changes are immediately available to your streaming applications
- **Large File Support**: Handle configuration files too large for direct Kafka streaming
- **Version Control**: Automatic versioning ensures data consistency
- **Performance**: Local caching minimizes API calls and latency
- **Flexibility**: Support for both JSON and binary configuration content
- **Scalability**: Efficient handling of high-throughput data streams