Offramps
Specify how tremor connects to the outside world in order to publish to external systems.
For example, the Elastic offramp pushes data to ElasticSearch via its bulk upload REST/HTTP API endpoint.
All offramps
are specified in the following form:
offramp:
- id: <unique offramp id>
type: <offramp name>
codec: <codec of the data>
postprocessors: # can be omitted
- <postprocessor 1>
- <postprocessor 2>
- ...
preprocessors: # only for linked transport, can be omitted
- <preprocessor 1>
- <preprocessor 2>
- ...
linked: <true or false> # enable linked transport, default: false
codec_map:
"<mime-type>": "<codec handling the given mime-type>"
config:
<key>: <value>
Delivery Properties
Each offramp is able to report events as being definitely sent off or as failed. It can also report itself as not functional anymore to the connected pipelines. How each offramp implements those abilities is described in the table below.
The column Delivery acknowledgements
describes under what circumstances the offramp considers an event delivered and acknowledges it to the connected pipelines and operators, onramps etc. therein.
Acknowledgements, Failures or missing Acknowledgements take effect e.g. when using the operators or onramps that support those mechanisms (e.g. the WAL operator or the Kafka onramp).
The column Disconnect events
describes under which circumstances this offramp is not considered functional anymore.
Offramp | Disconnect events | Delivery acknowledgements |
---|---|---|
amqp | never | always |
blackhole | never | always |
cb | never | always |
debug | never | always |
dns | never | always |
elastic | connection loss | on 200 replies |
exit | never | always |
file | never | always |
gcs | connection loss | on successful send |
gpub | connection loss | on successful send |
kafka | see librdkafka | see librdkafka |
kv | never | always |
nats | connection loss | always |
newrelic | never | never |
otel | connection loss | on successful delivery |
Postgres | never | always |
rest | connection loss | on non 4xx/5xx replies |
stderr | never | always |
stdout | never | always |
tcp | connection loss | on send |
udp | local socket loss | on send |
ws | connection loss | on send |
System Offramps
Each tremor runtime comes with some pre-configured offramps
that can be used.
system::stdout
The offramp /offramp/system::stdout/system
can be used to print to STDOUT. Data will be formatted as JSON.
system::stderr
The offramp /offramp/system::stderr/system
can be used to print to STDERR. Data will be formatted as JSON.
Supported Offramps
amqp
The amqp offramp publishes AMQP messages to an AMQP broker. It uses lapin for an AMQP 0.9.1 protocol implementation.
Example:
offramp:
- id: amqp
type: amqp
config:
amqp_addr: "amqp://guest:guest@127.0.0.1:5672/"
exchange: ""
routing_key: "hello"
publish_options:
immediate: true
mandatory: false
Supported configuration options are:
amqp_addr
- an AMQP URI as string, required. For more details see AMQP 0.9.1 URI spec.exchange
- The exchange to publish messages to. Format: String, optional, defaults to the empty string and then publish to the default exchange.routing_key
- Specifies the routing key for the message. The routing key is used for routing messages depending on the exchange configuration. Format: String, optional, defaults to the empty string.publish_options
- Required config that controls what happens when a message cannot be routed to a queueimmediate
- This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag istrue
, the server will return an undeliverable message with a Return method. If this flag isfalse
, the server will queue the message, but with no guarantee that it will ever be consumed. Default:false
.mandatory
- This flag tells the server how to react if the message cannot be routed to a queue. If this flag istrue
, the server will return an unroutable message with a Return method. If this flag is false, the server silently drops the message. Default:false
.
Events are published (after being serialized and possibly chunked codec and postprocessors) as AMQP Messages to the specified exchange
(empty string is the default exchange) using the specified routing key.
This offramp uses the AMQP 0.9.1 basic-publish operation.
blackhole
The blackhole offramp is used for benchmarking it takes measurements of the end to end times of each event traversing the pipeline and at the end prints an HDR ( High Dynamic Range ) histogram.
Supported configuration options are:
warmup_secs
- Number of seconds after startup in which latency won't be measured to allow for a warmup delay.stop_after_secs
- Stop tremor after a given number of seconds and print the histogram.significant_figures
- Significant figures for the HDR histogram. (the first digits of each measurement that are kept as precise values)
Example:
offramp:
- id: bh
type: blackhole
config:
warmup_secs: 10
stop_after_secs: 40
cb
cb
is short for circuit breaker. This offramp can be used to trigger certain circuit breaker events manually by sending the intended circuit breaker event in the event metadata or the event data.
Supported payloads are:
ack
fail
trigger
orclose
open
orrestore
No matter how many
ack
andfail
strings thecb
key contains, only ever oneack
orfail
CB event will be emitted, to stay within the CB protocol. The same is true fortrigger
/close
andopen
/restore
strings, only one of those two will be emitted, never more.Example config:
offramp:
- id: cb_tester
type: cb
Example payloads:
{
"cb": "ack",
"some_other_field": true
}
{
"cb": ["fail", "close"]
}
Such an event or metadata will result in two CB insight events be sent back, one fail
event, and one close
event.
debug
The debug offramp is used to get an overview of how many events are put in which classification.
This operator does not support configuration.
Used metadata variables:
$class
- Class of the event to count by. (optional)
Example:
offramp:
- id: dbg
type: debug
dns
The dns
linked offramp allows performing DNS queries against the system resolver.
::note No codecs, configuration, or processors are supported. :::
Example:
- id: dns
type: dns
The event needs the following structure:
{
"lookup": "tremor.rs"
}
{
"lookup": {
"name": "tremor.rs",
"type": "CNAME"
}
}
where type can be one of (please consult your DNS manual for the meaning of each):
A
AAAA
ANAME
CNAME
TXT
PTR
CAA
HINFO
HTTPS
MX
NAPTR
NULL
NS
OPENPGPKEY
SOA
SRV
SSHFP
SVCB
TLSA
If type is not specified A
records will be looked up
Responses are an Array of objects denoting the type of record found as a key, followed by the entry as a string and a ttl
for the record (please consult your DNS manual for the return value of different record types):
[
{"A": "1.2.3.4", "ttl": 60},
{"CNAME": "www.tremor.rs", "ttl": 120}
]
elastic
The elastic offramp writes to one or more ElasticSearch nodes. This is currently tested with ES v6 and v7.
Supported configuration options are:
nodes
- A list of elastic search nodes to contact. These are the nodes to which tremor will send traffic to.concurrency
- Maximum number of parallel requests (default: 4).
Events will be sent to the connected ElasticSearch cluster via the ES Bulk API using the index
action.
It is recommended to batch events sent to this sink using the generic::batch operator to reduce the overhead
introduced by the ES Bulk API.
The configuration options codec
and postprocessors
are not used, as elastic will always serialize event payloads as JSON.
If the number of parallel requests surpass concurrency
, an error event will be emitted to the err
port, which can be used for appropriate error handling.
The following metadata variables can be specified on a per event basis:
$elastic["_index"]
- The index to write to (required).$elastic["_type"]
- The document type for elastic (optional), deprecated in ES 7.$elastic["_id"]
- The document id for elastic (optional).$elastic.pipeline
- The ElasticSearch pipeline to use (optional).$elastic.action
- The bulk action to perform, one ofdelete
,create
,update
orindex
. If noaction
is provided it defaults toindex
.delete
andupdate
require$elastic._id
to be set or elastic search will have error.
Linked Transport
If used as a linked transport, the sink will emit events in case of errors sending data to ES or if the incoming event is malformed via the err
port.
Also, upon a returned ES Bulk request, it will emit an event for each bulk item, denoting success or failure.
Success Responses
Events denoting success are sent via the out
port and have the following format:
{
"success": true,
"source": {
"event_id": "1:2:3",
"origin": "file:///tmp/input.json.xz"
},
"payload": {}
}
The event metadata will contain the following:
{
"elastic": {
"_id": "ES document id",
"_type": "ES document type",
"_index": "ES index",
"version": "ES document version"
}
}
Error Responses
Events denoting bulk item failure are sent via the err
port and have the following format:
{
"success": false,
"source": {
"event_id": "2:3:4",
"origin": null
},
"error": {},
"payload": {}
}
error
will contain the error description object returned from ES.
The event metadata for failed events looks as follows:
{
"elastic": {
"_id": "ES document id",
"_type": "ES document type",
"_index": "ES index",
}
}
Error Responses that are not scoped to bulk items, but the whole operation of turning the event into a Elasticsearch Bulk Request and sending that request to Elasticsearch
will have the same payload format as Bulk item errors, but the $elastic
metadata is missing:
{
"success": false,
"source": {
"event_id": "2:3:4",
"origin": "tremor-rest://example.org/"
},
"error": {},
"payload": {}
}
For both error and success responses payload
is the event payload that was sent to ES.
Example Configuration:
offramp:
- id: es
type: elastic
config:
nodes:
- http://elastic:9200
Example Configuration for linked transport (including binding):
offramp:
- id: es-linked
type: elastic
linked: true
config:
nodes:
- http://elastic1:9200
- http://elastic2:9200
concurrency: 8
binding:
id: "es-linked-binding"
links:
"/onramp/example/{instance}/out": ["/pipeline/to_elastic/{instance}/in"]
"/pipeline/to_elastic/{instance}/in": ["/offramp/es-linked/{instance}/in"]
# handle success and error messages with different pipelines
"/offramp/es-linked/{instance}/out": ["/pipeline/handle-es-success/{instance}/in"]
"/offramp/es-linked/{instance}/err": ["/pipeline/handle-es-error/{instance}/in"]
# more links...
...
exit
The exit offramp terminates the runtime with a system exit status.
The offramp accepts events via its standard input port and responds to events with a record structure containing a numeric exit field.
To indicate successful termination, an exit
status of zero may be used:
{ "exit": 0 }
To indicate non-successful termination, a non-zero exit
status may be used:
{ "exit": 1 }
Exit codes should follow standard UNIX/Linux guidelines when being integrated
with bash
or other shell-based environments, as follows:
Code | Meaning |
---|---|
0 | Success |
1 | General errors |
2 | Misuse of builtins |
126 | Command invoked cannot run due to credentials/auth constraints |
127 | Command not understood, not well-formed or illegal |
To delay the exit (to allow flushing of other offramps
) the delay
key can be used to delay the exit by a number of milliseconds:
{
"exit": 1,
"delay": 1000
}
Example:
offramp:
- id: terminate
type: exit
file
The file offramp writes events to a file, one event per line. The file is overwritten if it exists.
The default codec is json
.
Supported configuration options are:
file
- The file to write to.
Example:
offramp:
- id: in
type: file
config:
file: /my/path/to/a/file.json
gcs
Google Cloud Storage offramp.
This offramp can issue basic operations to list buckets and objects and to create, insert and delete objects from the Google Cloud Platform cloud storage service.
This offramp is experimental.
This offramp assumes that the environment variable GOOGLE_APPLICATION_CREDENTIALS
been exported to the execution environment and it has been configured to point to a valid non-expired service account token json file.
Example:
offramp:
- id: gcs
type: gcs
codec: json
linked: true
postprocessors:
- gzip
preprocessors:
- gzip
If the use case for the offramp requires metadata from the Google Cloud Storage service on the supported operations for this offramp, then set linked to true and configure the output port out
.
If the use case does not require metadata, then set linked to false.
list_buckets
Lists all the buckets in the Google Cloud Storage project.
Request:
{
"command": "list_buckets",
"project_id": "<project-id>"
}
Response:
[
{
"cmd": "list_buckets",
"data": {
"items": [
{
"location": "EU",
"id": "<bucket-id>",
"locationType": "multi-region",
"storageClass": "STANDARD",
"metageneration": "4",
"selfLink": "https://www.googleapis.com/storage/v1/b/<bucket-id>",
"kind": "storage#bucket",
"name": "<bucket-name>",
"timeCreated": "<time of creation>",
"iamConfiguration": {
"uniformBucketLevelAccess": {
"enabled": true,
"lockedTime": "2021-06-17T09:12:22.122Z"
},
"bucketPolicyOnly": {
"enabled": true,
"lockedTime": "2021-06-17T09:12:22.122Z"
}
},
"etag": "CAQ=",
"defaultEventBasedHold": false,
"retentionPolicy": {
"effectiveTime": "2021-03-19T09:12:22.122Z",
"retentionPeriod": "60",
"isLocked": false
},
"updated": "2021-04-13T17:05:29.404Z",
"satisfiesPZS": false,
"projectNumber": "<project-number>"
}
],
"kind": "storage#buckets"
}
}
]
Where
<project-id>
- The project id of the Google Cloud Storage project where the buckets are.<bucket-id>
- The unique identifier of the bucket.<project-number>
- The project number for the Google Cloud Storage project.
list_objects
Lists all the objects in the specified bucket.
Request:
{
"command": "list_objects",
"bucket": "<bucket-name>"
}
Response:
[
{
"cmd": "list_objects",
"data": {
"items": [
{
"mediaLink": "",
"bucket": "<bucket-name>",
"id": "<object-id>",
"size": "1943550",
"crc32c": "0QXXmA==",
"storageClass": "STANDARD",
"generation": "1616145246990906",
"metageneration": "1",
"timeStorageClassUpdated": "2021-03-19T09:14:07.012Z",
"selfLink": "",
"kind": "storage#object",
"name": "<object-name>",
"md5Hash": "ucrP5b1N+6JHxn1TKavn/A==",
"timeCreated": "2021-03-19T09:14:07.012Z",
"etag": "CLrk6JqCvO8CEAE=",
"updated": "2021-03-19T09:14:07.012Z",
"retentionExpirationTime": "2021-03-19T09:15:07.012Z",
"contentType": "<content-type>"
}
],
"kind": "storage#objects"
}
}
]
Where
<bucket-name>
- The name of the Google Cloud Storage bucket where the object is.<object-id>
- The unique object identifier.<object-name>
- The name of the object in the Google Cloud Stoarge bucket.<content-type>
- The type of the object uploaded.
create_bucket
Creates a bucket in the project specified in the command.
Request:
{
"command": "create_bucket" ,
"project_id": "<project-id>",
"bucket": "<bucket>"
}
Response:
[
{
"cmd": "create_bucket",
"data": {
"projectNumber": "<project-number>",
"location": "US",
"id": "<bucket-id>",
"etag": "CAE=",
"locationType": "multi-region",
"storageClass": "STANDARD",
"metageneration": "1",
"updated": "2021-04-22T07:37:23.702Z",
"selfLink": "https://www.googleapis.com/storage/v1/b/<bucket-name>",
"kind": "storage#bucket",
"name": "tremor",
"iamConfiguration": {
"uniformBucketLevelAccess": {
"enabled": false
},
"bucketPolicyOnly": {
"enabled": false
}
},
"timeCreated": "2021-04-22T07:37:23.702Z"
}
}
]
Where
<project-id>
- The project id of the Google Cloud Storage project where the bucket is.<bucket-id>
- The unique identifier of the bucket.<project-number>
- The project number for the Google Cloud Storage project.
remove_bucket
Removes the bucket from the specified project in Google Cloud Storage project.
Request:
{
"command": "remove_bucket",
"bucket": "<bucket-name>"
}
Response:
Where
<bucket-name>
- The name of the Google Cloud Storage bucket where the object is.
upload_object
Uploads the object to a Google Cloud Storage bucket.
Request:
{
"command": "upload_object",
"bucket": "<bucket-name>",
"object": "<object-name>",
"body": `<object>`
}
Response:
[
{
"cmd": "upload_object",
"data": {
"mediaLink": "https://storage.googleapis.com/download/storage/v1/b/<bucket-name>/o/<object-name>?generation=1619077199260663&alt=media",
"bucket": "<bucket-name>",
"id": "<object-id>",
"size": "47",
"crc32c": "a7mrxw==",
"storageClass": "STANDARD",
"generation": "1619077199260663",
"metageneration": "1",
"timeStorageClassUpdated": "2021-04-22T07:39:59.278Z",
"selfLink": "https://www.googleapis.com/storage/v1/b/<bucket-name>/o/<object-name>",
"kind": "storage#object",
"name": "april",
"md5Hash": "qhr9326j+3PeIK9koXMHCg==",
"timeCreated": "2021-04-22T07:39:59.278Z",
"etag": "CPfXzcqskfACEAE=",
"updated": "2021-04-22T07:39:59.278Z",
"retentionExpirationTime": "2021-04-22T07:40:59.278Z",
"contentType": "application/json"
}
}
]
Where
<object>
- The object data to be uploaded to a Google Cloud Storage bucket.<bucket-name>
- The name of the Google Cloud Storage bucket where the object is.<object-id>
- The unique object identifier.<object-name>
- The name of the object in the Google Cloud Stoarge bucket.
fetch_object
Returns the metadata for the object fetched.
Request:
{
"command": "fetch",
"bucket": "<bucket-name>",
"object": "<object-name>"
}
Response:
[
{
"cmd": "fetch",
"data": {
"mediaLink": "https://storage.googleapis.com/download/storage/v1/b/<bucket-name>/o/<object-name>?generation=1619077199260663&alt=media",
"bucket": "<bucket-name>",
"id": "<object-id>",
"size": "47",
"crc32c": "a7mrxw==",
"storageClass": "STANDARD",
"generation": "1619077199260663",
"metageneration": "1",
"timeStorageClassUpdated": "2021-04-22T07:39:59.278Z",
"selfLink": "https://www.googleapis.com/storage/v1/b/<bucket-name>/o/<object-name>",
"kind": "storage#object",
"name": "april",
"md5Hash": "qhr9326j+3PeIK9koXMHCg==",
"timeCreated": "2021-04-22T07:39:59.278Z",
"etag": "CPfXzcqskfACEAE=",
"updated": "2021-04-22T07:39:59.278Z",
"retentionExpirationTime": "2021-04-22T07:40:59.278Z",
"contentType": "application/json"
}
}
]
Where
<bucket-name>
- The name of the Google Cloud Storage bucket where the object is.<object-id>
- The unique object identifier.<object-name>
- The name of the object in the Google Cloud Stoarge bucket.
download_object
Downloads the object.
Request:
{
"command": "download_object",
"bucket": "<bucket-name>",
"object": "<object-name>"
}
Response:
[
{
"cmd": "download_object",
"data": `<object>`
}
]
Where
<bucket-name>
- The name of the Google Cloud Storage bucket where the object is.<object-name>
- The name of the object in the Google Cloud Stoarge bucket.<object>
- The object downloaded.
remove_object
Removes the object from the specified bucket.
Request:
{
"command": "remove_object" ,
"bucket":"<bucket-name>",
"object": "<object-name>"
}
Response:
Where
<bucket-name>
- The name of the Google Cloud Storage bucket where the object is.<object-name>
- The name of the object in the Google Cloud Stoarge bucket.
gpub
Google Cloud Pubsub - Publisher
This offramp can issue basic operation of creating a subscription and sending a message to a topic.
This offramp is experimental.
This offramp assumes that the environment variable GOOGLE_APPLICATION_CREDENTIALS
has been exported to the execution environment and it has been configured to point to a valid non-expired service account token json file.
Supported configuration options are:
pem
- The pem file from GCP for authentication.
Example:
offramp:
- id: gpub
type: gpub
codec: json
postprocessors:
- gzip
linked: true
config:
pem: gcp.pem
If the use case for the offramp requires metadata from the Google Cloud Pub/Sub service on the supported operations for this offramp, then set linked to true and configure the output port out
. If the use case does not require metadata, then set linked to false.
create_subscription
Create a subsrciption to a pub/sub topic.
Request:
{
"command": "create_subscription",
"project": "<project-id>",
"topic": "<topic-name>",
"subscription": "<subscription-name>",
"message_ordering": `<message-ordering>`
}
Response:
{
"subscription": "projects/<project-id>/subscriptions/<subscription-name>",
"topic": "projects/<project-id>/topics/<topic-name>",
"ack_deadline_seconds": `<ack_deadline_seconds>`,
"retain_acked_messages": `<retain_acked_messages>`,
"enable_message_ordering": `<enable_message_ordering>`,
"message_retention_duration": `<message_retention_duration>`,
"filter": "<filter>",
"detached": `<detached>`
}
Where
<project-id>
- The project id of the Google Cloud Pub/sub project where the topic is.<topic-name>
- The Google cloud Pub/Sub topic name to which the subscription is being created.<subscription-name>
- Set a unique name for the subscription to be created.<message-ordering>
- Can be set to true or false. To receive the messages in order, set the message ordering property on the subscription you receive messages from. Receiving messages in order might increase latency.<ack_deadline_seconds>
- The approximate amount of time (on a best-effort basis) Pub/Sub waits for the subscriber to acknowledge receipt before resending the message. In the interval after the message is delivered and before it is acknowledged, it is considered to be outstanding. During that time period, the message will not be redelivered (on a best-effort basis).<retain_acked_messages>
- Indicates whether to retain acknowledged messages. If true, then messages are not expunged from the subscription's backlog, even if they are acknowledged, until they fall out of the message_retention_duration window.<enable_message_ordering>
- If true, messages published with the same ordering_key in PubsubMessage will be delivered to the subscribers in the order in which they are received by the Pub/Sub system. Otherwise, they may be delivered in any order.<message_retention_duration>
- How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published. If retain_acked_messages is true, then this also configures the retention of acknowledged messages, and thus configures how far back in time a Seek can be done. Defaults to 7 days. Cannot be more than 7 days or less than 10 minutes.<filter>
- An expression written in the Pub/Sub filter language. If non-empty, then only PubsubMessages whose attributes field matches the filter are delivered on this subscription. If empty, then no messages are filtered out.<detached>
- Indicates whether the subscription is detached from its topic. Detached subscriptions don't receive messages from their topic and don't retain any backlog.
send_message
Send a message to a pubsub topic.
Request:
{
"command": "send_message",
"project": "<project-id>",
"topic": "<topic-name>",
"data": `<data>`,
"ordering_key": "<ordering-key>"
}
Response:
{
"message-id": "<message-id>",
"command": "send_message"
}
Where
<project-id>
- The project id of the Google Cloud Pubsub project where the topic is.<topic-name>
- The Google cloud PubSub topic name to which the message is being sent.<data>
- The data that is to be sent as message.<ordering-key>
- If non-empty, identifies related messages for which publish order should be respected. If a Subscription has message_ordering set to true, messages published with the same non-empty ordering_key value will be delivered to subscribers in the order in which they are received by the pub/sub system. All PubsubMessages published in a given PublishRequest must specify the same ordering_key value.<message-id>
- The message id assigned by the Google Cloud pub/sub api.
Kafka
The Kafka offramp connects sends events to Kafka topics. It uses librdkafka
to handle connections and can use the full set of librdkaka 1.6.1 configuration options.
The default codec is json
.
Supported configuration options are:
topic
- The topic to send to.brokers
- Broker servers to connect to. (Kafka nodes)hostname
- Hostname to identify the client with. (default: the systems hostname)key
- Key to use for messages (default: none)rdkafka_options
- An optional map of option to value, where both sides need to be strings.
Used metadata variables:
$kafka
- Record consisting of the following meta information:$headers
: A record denoting the headers for the message.$key
: Same as configkey
(optional. overrides related config param when present)
Example:
offramp:
- id: kafka-out
type: kafka
config:
brokers:
- kafka:9092
topic: demo
kv
The kv
offramp is intended to allow for a decoupled way of persisting and retrieving state in a non blocking way.
The kv
store is persisted to the directory configured in config.dir
. If it doesnt exist, it will be created.
Example:
- id: kv
type: kv
linked: true # this needs to be true
codec: json
config:
dir: "temp/kv" # directory to store data in
One response event is sent for every command in the handled event. On success to the out
port, in the error case (malformed data or error during kv operation) to the err
port.
Each response event will have the following metadata structure:
{
"kv": {
"op": "<string>|null" // contains the command key that was used to trigger this response, can be `null` for error events.
}
}
Events sent to the KV offramp contain commands. The following are supported:
#### get
Fetches the data for a given key.
**Request**:
```js
{"get": {"key": "<string|binary>"}}
Response:
Key was found, the format of decoded depends on the codec (does NOT have to be a string):
{
"ok": {
"key": "<binary>",
"value": "<decoded>"
}
}
Key was not found:
{
"ok": {
"key": "<binary>",
"value": null
}
}
put
Writes a value to a key, returns the old value if there was any.
Request:
{
"put": {
"key": "<string|binary>",
"value": "<to encode>" // the format of value depends on the codec (does NOT have to be a string)
}
}
Response:
Key was used before, this is the old value, the format of decoded depends on the codec (does NOT have to be a string):
{
"ok": {
"key": "<binary>",
"value": "<decoded>"
}
}
Key was not used before:
{
"ok": {
"key": "<binary>",
"value": null
}
}
delete
Deletes a key, returns the old value if there was any.
Request:
{"delete": {"key": "<string|binary>"}}
Response:
Key was used before, this is the old value, the format of decoded depends on the codec (does NOT have to be a string):
{
"ok": {
"key": "<binary>",
"value": "<decoded>"
}
}
Key was not used before:
{
"ok": {
"key": "<binary>",
"value": null
}
}
scan
Reads a range of keys
Request:
{"scan": {
"start": "<string|binary>", // optional, if not set will start with the first key
"end": "<string|binary>", // optional, if not set will read to the end key
}}
Response:
{
"ok": [
{
"key": "<binary>", // keys are ALWAYS encoded as binary since we don't know if it's a string or binary
"value": "<decoded>" // the value, the format of decoded depends on the codec (does NOT have to be a string)
} // repeated, may be empty
]
}
cas
Compare And Swap operation. Those operations require old values to match what it is compared to
Request:
{"cas": {
"key": "<string|binary>", // The key to operate on
"old": "<to encode|not-set>", // The old value, if not set means "this value wasn't present"
"new": "<to encode|not-set>", // The new value, if not set it means it gets deleted
}}
Response: On success:
{
"ok": null,
"key": "<binary>"
}
On failure:
{
"error": {
"current": "<decoded>", // the value that is currently stored, the format of decoded depends on the codec (does NOT have to be a string)
"proposed": "<decoded>" // the value that was proposed/expected to be there, the format of decoded depends on the codec (does NOT have to be a string)
},
"key": "<binary>"
}
nats
The nats
offramp connects to Nats server(s) and publishes a message to specified subject for every event.
The default codec is json
.
Supported configuration operations are:
hosts
- List of hosts to connect to.subject
- Subject for the message to be published.reply
- Optional string specifying the reply subject.headers
- Option key-value pairs, specifying message headers, where the key is a string and the value is a list of strings.options
- Optional struct, which can be used to customize the connection to the server (seenats.rs
configuration options for more info):token
: String; authenticate using a token.username
: String; authenticate using a username and password.password
: String; authenticate using a username and password.credentials_path
: String; path to a.creds
file for authentication.cert_path
: String; path to the client certificate file.key_path
: String; path to private key file.name
: String; name this configuration.echo
: Boolean; if true, published messages will not be delivered.max_reconnects
: Integer; max number of reconnection attempts.reconnect_buffer_size
: Integer; max amount of bytes to buffer when accepting outgoing traffic in disconnected mode.tls
: Boolean; if true, sets tls for all server connections.root_cert
: String; path to a root certificate.
Used metadata variables are:
$nats
: Record consisting of the following metadata:$reply
: Overridesconfig.reply
if present.$headers
: Overridesconfig.headers
if present.
Example:
offramp:
- id: nats-out
type: nats
config:
hosts:
- "127.0.0.1:4444"
subject: demo
reply: ghost
headers:
snot:
- badger
- ferris
options:
name: nats-demo
reconnect_buffer_size: 1
newrelic
Send events to New Relic platform, using its log apis (variable by region).
This offramp encodes events as json, as this is required by the newrelic
log api. Postprocessors are not used.
Supported configuration options are:
license_key
- New Relic's license (or insert only) keycompress_logs
- Whether logs should be compressed before sending to New Relic (avoids extra egress costs but at the cost of more cpu usage by tremor) (default: false)region
- Region to use to send logs. Available choices: USA, Europe (default: USA)
Example:
offramp:
- id: newrelic
type: newrelic
config:
license_key: keystring
compress_logs: true
region: europe
otel
CNCF OpenTelemetry offramp. Publishes to the specified host or IP and destination TCP port via gRPC messages
conforming to the CNCF OpenTelemetry protocol specification v1. Forwards tremor value variants of logs
, trace
and metrics
messages from tremor query pipelines downstream to remote OpenTelemetry endpoints.
This offramp is experimental.
Supported configuration options are:
host
- String - The host or IP to listen onport
- integer - The TCP port to listen on- 'logs' - boolean - Is logging enabled for this instance. Defaults to
true
. Receivedlogs
events are dropped whenfalse
. - 'metrics' - boolean - Is metrics enabled for this instance. Defaults to
true
. Defaults totrue
. Receivedmetrics
events are dropped whenfalse
. - 'trace' - boolean - Is trace enabled for this instance. Defaults to
true
. Defaults totrue
. Receivedtrace
events are dopped whenfalse
.
Pipelines that leverage the OpenTelemetry integration can use utility modules in the cncf::otel
module to
simplify working with the tremor value mapping of the event data. The connector translates the tremor value level
data to protocol buffers automatically for distribution to downstream OpenTelemetry systems.
The connector can be used with the qos::wal
operator for transient in-memory or persistent disk-based guaranteed delivery. If either
tremor or the downstream system fails or becomes uncontactable users can configure ( bytes and/or number of messages retained ) retention
for lossless recovery. For events marked as transactional that are explicitly acknowledged, fail
insights are propagated for events that
are not succesfully transmitted downstream. Non-transactional events ( those not marked as transactional ) are delivered on a best effort
basis. Regardless of the transaction configuration, when paired with qos operators upstream pipelines, the sink will coordinate failover
and recovery to the configured retention, replaying the retained messages upon recovery of network accessibility of the downstream endpoints.
For best effort delivery - the qos::wal
can be omitted and events distributed when downstream endpoints are inaccessible will be
lost.
Example:
offramp:
- id: otlp
type: otel
codec: json
config:
port: 4317
host: 10.0.2.1
PostgreSQL
PostgreSQL offramp.
Supported configuration options are:
host
- PostgreSQL database hostnameport
- PostgresSQL database portuser
- Username for authenticationpassword
- Password for authenticationdbname
- Database nametable
- Database table name
Data that comes in will be used to run an INSERT
statement. It is required
that data comes in objects representing columns. The object key must represent field
name in the database and must contain following fields:
fieldType
- a PostgreSQL field type (e.g.VARCHAR
,INT4
,TIMESTAMPTZ
, etc.)name
- field name as represented by database table schemavalue
- the value of the field
codec
and postprocessors
config values are ignored as they cannot apply to this offramp, since the event is transformed into a SQL query.
Example:
id: db
type: postgres
config:
host: localhost
port: 5432
user: postgres
password: example
dbname: sales
table: transactions
rest
The rest offramp is used to send events to the specified endpoint.
Supported configuration options are:
endpoint
- Endpoint URL. Can be provided as string or as struct. The struct form is composed of the following standard URL fields:scheme
- String, required, typicallyhttp
username
- String, optionalpassword
- String, optionalhost
- String, required, hostname or ip addressport
- Number, optional, defaults to80
path
- String, optional, defaults to/
query
- String, optionalfragment
- String, optional
method
- HTTP method to use (default:POST
)headers
- A map of headers to set for the requests, where both sides are stringsconcurrency
- Number of parallel in-flight requests (default:4
)auth
- Provide a token that will authenticate the offramp client. Supports empty string andgcp
(default:""
)
Used metadata variables:
Setting these metadata variables here allows users to dynamically change the behaviour of the rest offramp:
$endpoint
- same format as configendpoint
(optional. overrides related config param when present)$request
- A record capturing the HTTP request attributes. Available fields within:method
- same as configmethod
(optional. overrides related config param when present)headers
- A map from header name (string) to header value (string or array of strings)(optional. overrides related config param when present)
These variables are aligned with the similar variables generated by the rest onramp. Note that $request.url
is not utilized here -- instead the same can be configured by setting $endpoint
(since enabling the former here can lead to request loops when events sourced from rest onramp are fed to the rest offramp, without overriding it from the pipeline. also, $endpoint
can be separately configured as a URL string, which is convenient for offramp use).
The rest offramp encodes the event as request body using the Content-Type
header if present, using the customizable builtin codec_map
to determine a matching coded. It falls back to use the configured codec if no Content-Type
header is available.
When used as Linked Transport the same handling is applied to the incoming HTTP response, giving precedence to the Content-Type
header and only falling back to the configured codec
.
If the number of parallel requests surpass concurrency
, an error event will be emitted to the err
port, which can be used for appropriate error handling.
Set metadata variables:
These metadata variables are used for HTTP response events emitted through the
OUT
port:
$response
- A record capturing the HTTP response attributes. Available fields within:status
- Numeric HTTP status codeheaders
- A record that maps header name (lowercase string) to value (array of strings)
$request
- A record with the related HTTP request attributes. Available fields within:method
- HTTP method usedheaders
- A record containing all the request headersendpoint
- A record containing all the fields that configendpoint
does
When used as a linked offramp, batched events are rejected by the offramp.
Example 1:
offramp:
- id: rest-offramp
type: rest
codec: json
postprocessors:
- gzip
linked: true
config:
endpoint:
host: httpbin.org
port: 80
path: /anything
query: "q=search"
headers:
"Accept": "application/json"
"Transfer-Encoding": "gzip"
Example 2:
offramp:
- id: rest-offramp-2
type: rest
codec: json
codec_map:
"text/html": "string"
"application/vnd.stuff": "string"
config:
endpoint:
host: httpbin.org
path: /patch
method: PATCH
rest offramp example for InfluxDB
The structure is given for context.
offramp:
- id: influxdb
type: rest
codec: influx
postprocessors:
- lines
config:
endpoint:
host: influx
path: /write
query: db=metrics
headers:
"Client": "Tremor"
stderr
A custom stderr offramp can be configured by using this offramp type. But beware that this will share the single stderr stream with system::stderr
.
The default codec is json.
The stderr offramp will write a \n
right after each event, and optionally prefix every event with a configurable prefix
.
If the event data (after codec and postprocessing) is not a valid UTF8 string (e.g. if it is binary data) if will by default output the bytes with debug formatting.
If raw
is set to true, the event data will be put on stderr as is.
Supported configuration options:
prefix
- A prefix written before each event (optional string).raw
- Write event data bytes as is to stderr.
Example:
offramp:
- id: raw_stuff
type: stderr
codec: json
postprocessors:
- snappy
config:
raw: true
stdout
A custom stdout offramp can be configured by using this offramp type. But beware that this will share the single stdout stream with system::stdout
.
The default codec is json.
The stdout offramp will write a \n
right after each event, and optionally prefix every event with a configurable prefix
.
If the event data (after codec and postprocessing) is not a valid UTF8 string (e.g. if it is binary data) if will by default output the bytes with debug formatting.
If raw
is set to true, the event data will be put on stdout as is.
Supported configuration options:
prefix
- A prefix written before each event (optional string).raw
- Write event data bytes as is to stdout.
Example:
offramp:
- id: like_a_python_repl
type: stdout
config:
prefix: ">>> "
tcp
This connects on a specified port for distributing outbound TCP data. TLS is supported via rustls.
The offramp can leverage postprocessors to frame data after codecs are applied and events are forwarded to external TCP protocol distribution endpoints.
The default codec is json
.
Supported configuration options are:
host
- The host to advertise asport
- The TCP port to listen onis_non_blocking
- Is the socket configured as non-blocking ( default: false )ttl
- Set the socket's time-to-live ( default: 64 )is_no_delay
- Set the socket's Nagle ( delay ) algorithm to off ( default: true )tls
- If set totrue
or a detailed TLS config with the keys below, will wrap the TCP socket with a TLS session. ( default:false
)cafile
- If provided, only server certificates signed by the Certificate Authority represented by the root certificate incafile
are accepted. If not provided, all root certificated distributed by Mozilla via webpki-roots are loaded.domain
- if provided, this will be the domain to verify the TLS certificate against. It might differ tohost
, especially ifhost
is given as IP address. If not provided, thehost
config is used as domain.
offramp:
- id: tcp
type: tcp
codec: json
postprocessors:
- gzip
- base64
- lines
config:
host: "localhost"
port: 9000
TLS Support
TLS support is enabled by providing either tls: true
or a detailed config.
When providing tls: true
, the TCP offramp is verifying the server it connects to and its certificate against the host
config, and a set of common root-certificates provided by Mozilla via webpki-roots:
offramp:
- id: tcp
type: tcp
codec: json
postprocessors:
- length-prefixed
config:
host: "example.com"
port: 443
tls: true
This behaviour can be tuned by providing a detailed config. If tls.cafile
is provided, the TCP offramp will only accept server certificates signed (directly or indirectly via a server-provided certificate chain) by the given root-certificate. This is useful for testing against self-signed certificates. In this case the self signed certificate needs to provided via tls.cafile
. If tls.domain
is provided, this value will be used instead of the host
config to verify the domain name provided in the server certificate. This is especially useful if an IP address is used as host
config.
Client certificates are not supported.
Example:
offramp:
- id: tls
type: tcp
codec: json
config:
host: "127.0.0.1"
port: 65535
tls:
cafile: "path/to/custom_ca.pem"
domain: "localhost"
For testing purposes, self-signed certificates need the Subject Alternative Name extension, otherwise setting up the onramp will fail with a BadDER
Error. Adding this when generating a self-signed certificate can be done by passing -addext 'subjectAltName = DNS:<DOMAIN>'
via openssl
command line or by adding it to openssl.cfg
as described in the openssl manual.
udp
The UDP offramp sends data to a given host and port as UDP datagram.
The default codec is json
.
When the UDP onramp gets a batch of messages it will send each element of the batch as a distinct UDP datagram.
Supported configuration options are:
bind.host
- the local host to send data frombind.port
- the local port to send data fromhost
- the destination host to send data toport
- the destination port to send data to.
:::warn
Setting bound
to false
makes the UDP offramp potentially extremely slow as it forces a lookup of the destination on each event!
:::
Used metadata variables:
$udp.host
: This overwrites the configured destination host for this event. Expects a string.$udp.port
: This overwrites the configured destination port for this event. Expects an integer.
:::warn
Be careful to set $udp.host
to an IP, not a DNS name or the OS will resolve it on every event, which will be extremely slow!
:::
Example:
offramp:
- id: udp-out
type: udp
postprocessors:
- base64
config:
bind:
host: "10.11.12.13"
port: 1234
host: "20.21.22.23"
port: 2345
ws
Sends events over a WebSocket connection. Each event is a WebSocket message.
The default codec is json
.
Supported configuration options are:
url
- WebSocket endpoint to send data to.binary
- If data should be send as binary instead of text (default:false
).
Used metadata variables:
$url
- same as configurl
(optional. overrides related config param when present)$binary
- same as configbinary
(optional. overrides related config param when present)
Set metadata variables (for reply with linked transports):
$binary
-true
if the WebSocket message reply came as binary (false
otherwise)
When used as a linked offramp, batched events are rejected by the offramp.
Example:
onramp:
- id: ws
type: ws
config:
url: "ws://localhost:1234"