Customize sending logs with OpenTelemetry

OpenTelemetry collector forwards logs to a backend like Obics. Depending on the data source, the logs structure contains different fields. It's very likely you'll want to change field names to your current schema, remove unnecessary fields, or enrich the data with new fields. In many cases, like when reading from a file, the log data comes in a structured text template. e.g., [time] [level] [message]. In these case, the collector allows to parse the text and extract the relevant fields.

Here's an example of a logs payload that were produced by an AWS Lambda stdout stream, sent to AWS CloudWatch, and fetched with the awscloudwatchreceiver receiver:

{
  "resourceLogs": [
    {
      "resource": {
        "attributes": [
          {
            "key": "aws.region",
            "value": {
              "stringValue": "eu-central-1"
            }
          },
          {
            "key": "cloudwatch.log.group.name",
            "value": {
              "stringValue": "/aws/lambda/MyService-dev-api"
            }
          },
          {
            "key": "cloudwatch.log.stream",
            "value": {
              "stringValue": "2025/01/10/[$LATEST]cfff80b307c84e5ca7d46a9c7083b91c"
            }
          }
        ]
      },
      "scopeLogs": [
        {
          "scope": {},
          "logRecords": [
            {
              "timeUnixNano": "1736508251008000000",
              "observedTimeUnixNano": "1736508274402696500",
              "body": {
                "stringValue": "START RequestId: 73b39ac7-d4df-4c67-b6d0-8972da96596b Version: $LATEST\n"
              },
              "attributes": [
                {
                  "key": "id",
                  "value": {
                    "stringValue": "38725428040875466125135870248317517657032867829547466752"
                  }
                }
              ],
              "traceId": "",
              "spanId": ""
            },
            {
              "timeUnixNano": "1736508251009000000",
              "observedTimeUnixNano": "1736508274402696500",
              "body": {
                "stringValue": "2025-01-10T11:24:11.009Z\t73b39ac7-d4df-4c67-b6d0-8972da96596b\tERROR\tAPI Key fe03c7d8 is invalid G8Xz3\n"
              },
              "attributes": [
                {
                  "key": "id",
                  "value": {
                    "stringValue": "38725428040897766870334400871459053375305516191053447170"
                  }
                }
              ],
              "traceId": "",
              "spanId": ""
            },
            {
              "timeUnixNano": "1736508251011000000",
              "observedTimeUnixNano": "1736508274402696500",
              "body": {
                "stringValue": "END RequestId: 73b39ac7-d4df-4c67-b6d0-8972da96596b\n"
              },
              "attributes": [
                {
                  "key": "id",
                  "value": {
                    "stringValue": "38725428040942368360731462117742124811850812914065408004"
                  }
                }
              ],
              "traceId": "",
              "spanId": ""
            },
            {
              "timeUnixNano": "1736508251011000000",
              "observedTimeUnixNano": "1736508274402696500",
              "body": {
                "stringValue": "REPORT RequestId: 73b39ac7-d4df-4c67-b6d0-8972da96596b\tDuration: 2.86 ms\tBilled Duration: 3 ms\tMemory Size: 1024 MB\tMax Memory Used: 438 MB\t\n"
              },
              "attributes": [
                {
                  "key": "id",
                  "value": {
                    "stringValue": "38725428040942368360731462117742124811850812914065408005"
                  }
                }
              ],
              "traceId": "",
              "spanId": ""
            }
          ]
        }
      ]
    }
  ]
}

If this payload is forwarded to Obics without change, it will result with the following log entries:

Entry 1:
time: [time received in collector]
message: "START RequestId: 73b39ac7-d4df-4c67-b6d0-8972da96596b Version: $LATEST"
level: [default level]
id: 38725428040875466125135870248317517657032867829547466752

Entry 2:
time: [time received in collector]
message: "2025-01-10T11:24:11.009Z\t73b39ac7-d4df-4c67-b6d0-8972da96596b\tERROR\tAPI Key fe03c7d8 is invalid G8Xz3"
level: [default level]
id: 38725428040897766870334400871459053375305516191053447170

...

This data will can be improved in several ways:

  1. The body of each log is using a text template pattern that contains [time], [level], [RequestId], [actual message]. Instead of taking that string as-is, you can extract the data into relevant fields.
  2. It's best to extract the time that's written in the body since it will mostly likely be the time when the message was produced and not the time it was observed.
  3. There are 3 types of patterns here, which write the RequestId in different ways. This field is important to be able to group logs for the same lambda invocation.
  4. Some fields, like the id might be useless to you and you wouldn't want a column named id created in your Logs table.
  5. You might want to enrich this data with some context like host name, service name, region, etc. The payload contains metadata attributes on top and you can inject them into each log item. Note that the same collector instance can be used to receive messages from multiple sources, and metadata like cloudwatch.log.group.name could be useful to distinguish them.

OTEL collector has the ability to process log messages, albeit not the greatest developer experience. Here's an example of a collector configuration that takes care of the problems just mentioned:

receivers:
  awscloudwatch:
    # ... 
            
processors:
  batch: {}
  transform/logs: # We name this pipeline "transform/logs" for clarity, but you can name it anything.
    error_mode: ignore
    log_statements:
      - context: log
        statements:          
          - set(attributes["original_body"], body)
          - set(attributes["timestamp"], Split(body, "\t")[0]) where Split(body, "\t")[3] != nil
          - set(attributes["span_id"],Split(body, "\t")[1]) where Split(body, "\t")[3] != nil
          - set(attributes["level"],     Split(body, "\t")[2]) where Split(body, "\t")[3] != nil
          - set(attributes["span_id"],Split(body, " ")[2])  where Split(body, " ")[1] == "RequestId:"
          - set(body,                    Split(body, "\t")[3]) where Split(body, "\t")[3] != nil          
          - delete_key (attributes, "timestamp") where Split(attributes["original_body"], "\t")[4] != nil
          - delete_key (attributes, "level")     where Split(attributes["original_body"], "\t")[4] != nil
          - delete_key (attributes, "id")
          - delete_key (attributes, "original_body")
          - set(attributes["region"], resource.attributes["aws.region"])
          - set(attributes["service"], resource.attributes["cloudwatch.log.group.name"])

exporters:
  otlphttp:
     endpoint: "https://ingest.obics.io/api/otel"
     headers:
       x-api-key: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
     timeout: 30s
     encoding: json

service:
  pipelines:
    logs:
      receivers: [awscloudwatch]
      processors: [batch, transform/logs]
      exporters: [otlphttp, debug]

To understand what goes on here, note that there are 3 different body patterns that we're processing:

  1. START RequestId: 73b39ac7-d4df-4c67-b6d0-8972da96596b Version: $LATEST\n - Can be recognized by having no \t characters.
  2. 2025-01-10T11:24:11.009Z\t73b39ac7-d4df-4c67-b6d0-8972da96596b\tERROR\tAPI Key fe03c7d8 is invalid G8Xz3\n - Can be recognized by having exactly 3 \t characters.
  3. REPORT RequestId: 73b39ac7-d4df-4c67-b6d0-8972da96596b\tDuration: 2.86 ms\tBilled Duration: 3 ms\tMemory Size: 1024 MB\tMax Memory Used: 438 MB\t\n - Can be recognized by having exactly 5 \t characters.

Let's go over each transform statement:

  • set(attributes["original_body"], body) - Save the original body value temporarily.
  • set(attributes["timestamp"], Split(body, "\t")[0]) where Split(body, "\t")[3] != nil - Sets the timestamp field to the first token in the body. Catches 2nd and 3rd patterns, but relevant just to the 2nd pattern. The 3rd pattern deletes the timestamp field later on.
  • set(attributes["span_id"],Split(body, "\t")[1]) where Split(body, "\t")[3] != nil - Sets the span_id field to the 2nd token in the body. Catches the 2nd and 3rd patterns, but relevant just to the 2nd pattern. The 3rd pattern deletes the span_id field later on.
  • set(attributes["level"], Split(body, "\t")[2]) where Split(body, "\t")[3] != nil - Sets the level field to the 3rd token in the body. Catches the 2nd and 3rd patterns, but relevant just to the 2nd pattern. The 3rd pattern deletes the level field later on.
  • set(attributes["span_id"],Split(body, " ")[2]) where Split(body, " ")[1] == "RequestId:" - Extract the "RequestId" from the 1st and 3rd patterns and sets the span_id field to its value.
  • set(body, Split(body, "\t")[3]) where Split(body, "\t")[3] != nil - Sets the body to the 3rd part of the message for the 2nd pattern.
  • delete_key (attributes, "timestamp") where Split(attributes["original_body"], "\t")[4] != nil - Deletes field timestamp for the 3rd pattern.
  • delete_key (attributes, "level") where Split(attributes["original_body"], "\t")[4] != nil - Deletes field level for the 3rd pattern.
  • delete_key (attributes, "id") - Deletes field id for all patterns.
  • delete_key (attributes, "original_body") - Deletes field original_body for all patterns.
  • set(attributes["region"], resource.attributes["aws.region"]) - Enriches all logs with a new field region with the AWS region.
  • set(attributes["service"], resource.attributes["cloudwatch.log.group.name"]) - Enriches all logs with a new field service with the CloudWatch log group name.

You might think that this code could have been easier except that the OTEL processor is quite limited. For example, I couldn't find a way to set 2 conditions with an and operator in the where statement.

Note that in this particular instance, I was able to reuse the existing span_id column and populate it with the RequestId. You could say that for a lambda function, they serve the same purpose conceptually.

Specifying target table in Obics

Logs will be ingested to the table Logs by default. You can change the target table by including the header obics-table-name. For example:

# ...

exporters:
  otlphttp:
     endpoint: "https://ingest.obics.io/api/otel"
     headers:
       x-api-key: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
       obics-table-name: "DifferentLogs"
     timeout: 30s
     encoding: json

# ...

In this configuration, the logs will be ingested to the table DifferentLogs.

More resources

Customize sending logs with OpenTelemetry | Obics