Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,19 @@ Start all your containers again, and they should be using the new version of the

The following [log-opts](https://docs.docker.com/config/containers/logging/configure/#configure-the-default-logging-driver) are available for configuration:

| log-opt | default | description |
|----------------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| extract-json-message | true | Enables unmarshalling JSON messages and sending the jsonPayload as the unmarshalled map. Kind of the whole point of this plugin, but you can disable it so it behaves just like the `gcplogs` plugin if you wish |
| local-logging | false | Enables logging to a local file, so logs can be viewed with the `docker logs` command. If false, the command will show no output |
| extract-severity | true | Extracts the `severity` from JSON logs to set them for the log that will be sent to GCP. It will be removed from the jsonPayload section, since it is set at the root level. Currently the supported severity field names to extract are the following: `severity`, `level` |
| extract-msg | true | Extracts the `msg` field from JSON logs to set the `message` field GCP expects. It will be removed from the jsonPayload section, since it is set at the root level. Fields named msg are produced for example by the golang log/slog package. |
| extract-gcp | false | Extract trace, labels and source location fields if present and formatted for Google cloud logging. This is produced for example by the golang log/slog package with the slogdriver handler |
| extract-caddy | false | Extract trace and HTTP Request from caddy if present and format for Google cloud logging. |
| exclude-timestamp | false | Excludes timestamp fields from the final jsonPayload, since docker sends its own nanosecond precision timestamp for each log. Currently it can remove fields with the following names: `timestamp`, `time`, `ts` |
| sleep-interval | 500 | Milliseconds to sleep when there are no logs to send before checking again. The higher the value, the lower the CPU usage will be |
| credentials-file | | Absolute path to the GCP credentials JSON file to use when authenticating (only necessary when running the plugin outside of GCP) |
| credentials-json | | JSON string with the GCP credentials to use when authenticating (only necessary when running the plugin outside of GCP) |
| log-opt | default | description |
|-------------------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| extract-json-message | true | Enables unmarshalling JSON messages and sending the jsonPayload as the unmarshalled map. Kind of the whole point of this plugin, but you can disable it so it behaves just like the `gcplogs` plugin if you wish |
| local-logging | false | Enables logging to a local file, so logs can be viewed with the `docker logs` command. If false, the command will show no output |
| extract-severity | true | Extracts the `severity` from JSON logs to set them for the log that will be sent to GCP. It will be removed from the jsonPayload section, since it is set at the root level. Currently the supported severity field names to extract are the following: `severity`, `level` |
| extract-msg | true | Extracts the `msg` field from JSON logs to set the `message` field GCP expects. It will be removed from the jsonPayload section, since it is set at the root level. Fields named msg are produced for example by the golang log/slog package. |
| extract-gcp | false | Extract trace, labels and source location fields if present and formatted for Google cloud logging. This is produced for example by the golang log/slog package with the slogdriver handler |
| extract-caddy | false | Extract trace and HTTP Request from caddy if present and format for Google cloud logging. |
| exclude-timestamp | false | Excludes timestamp fields from the final jsonPayload, since docker sends its own nanosecond precision timestamp for each log. Currently it can remove fields with the following names: `timestamp`, `time`, `ts` |
| sleep-interval | 500 | Milliseconds to sleep when there are no logs to send before checking again. The higher the value, the lower the CPU usage will be |
| credentials-file | | Absolute path to the GCP credentials JSON file to use when authenticating (only necessary when running the plugin outside of GCP) |
| credentials-json | | JSON string with the GCP credentials to use when authenticating (only necessary when running the plugin outside of GCP) |
| internal-error-severity | warning | The severity with which internal driver errors should be reported |

### Building locally

Expand Down
42 changes: 34 additions & 8 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,7 @@ func (d *driver) consumeLog(lp *logPair) {
}

if len(bytes.Fields(buf.Line)) > 0 {
if err := lp.gLogger.Log(createMessageFromBuffer(&buf)); err != nil {
d.sLog.With("id", lp.info.ContainerID, "error", err, "message", buf).Error("error writing log to GCP logger message")
}
if lp.info.Config[localLoggingConfig] == "true" {
if err := lp.jsonl.Log(createMessageFromBuffer(&buf)); err != nil {
d.sLog.With("id", lp.info.ContainerID, "error", err, "message", buf).Error("error writing log message to JSON logger")
}
}
d.log(lp, createMessageFromBuffer(&buf))
} else {
time.Sleep(d.sleepInterval)
}
Expand All @@ -135,6 +128,39 @@ func (d *driver) consumeLog(lp *logPair) {
}
}

func (d *driver) log(lp *logPair, msg *logger.Message) {
if msg.Err != nil {
d.sLog.With("id", lp.info.ContainerID, "error", msg.Err, "message", string(msg.Line)).Error("writing driver err log to GCP logger")
}

if err := lp.gLogger.Log(msg); err != nil {
d.sLog.With("id", lp.info.ContainerID, "error", err, "message", string(msg.Line)).Error("error writing log to GCP logger message")
d.sLog.Error(fmt.Sprintf("error %+v", err))

// handleLoggingError will call with a non-nil err, so we avoid errors from being logged infinitely
if msg.Err == nil {
d.handleLoggingError(lp, msg, err)
}
}
if lp.info.Config[localLoggingConfig] == "true" {
if err := lp.jsonl.Log(msg); err != nil {
d.sLog.With("id", lp.info.ContainerID, "error", err, "message", string(msg.Line)).Error("error writing log message to JSON logger")
}
}
}

func (d *driver) handleLoggingError(lp *logPair, originalMsg *logger.Message, err error) {
var driverErr *nGCPError
if errors.As(err, &driverErr) && driverErr != nil {
d.log(lp, &logger.Message{
Line: originalMsg.Line,
Source: originalMsg.Source,
Timestamp: driverErr.ts,
Err: err,
})
}
}

func createMessageFromBuffer(buf *logdriver.LogEntry) *logger.Message {
var msg logger.Message
msg.Line = buf.Line
Expand Down
31 changes: 31 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"fmt"
"time"
)

type nGCPError struct {
Msg string `json:"msg"`
File string `json:"file"`
Line int `json:"line"`
ts time.Time
}

func (e *nGCPError) Error() string {
return fmt.Sprintf("%s/%d - %s - %s", e.File, e.Line, e.ts, e.Msg)
}

type driverError struct {
err error
}

func (e *driverError) Set(err error) {
if e.err == nil {
e.err = err
}
}

func (e *driverError) Get() error {
return e.err
}
132 changes: 78 additions & 54 deletions ngcplogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"net/url"
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -88,6 +88,8 @@ type nGCPLogger struct {
extractMsg bool
extractGcp bool
extractCaddy bool

internalErrorSeverity logging.Severity
}

type dockerLogEntry struct {
Expand Down Expand Up @@ -241,6 +243,12 @@ func New(info logger.Info) (logger.Logger, error) {
l.extractCaddy = true
}

if internalErrorSeverityStr, isPresent := info.Config["internal-error-severity"]; isPresent {
l.internalErrorSeverity = logging.ParseSeverity(internalErrorSeverityStr)
} else {
l.internalErrorSeverity = logging.Warning
}

if instanceResource != nil {
l.instance = instanceResource
}
Expand Down Expand Up @@ -276,9 +284,10 @@ func ValidateLogOpts(cfg map[string]string) error {
}

func (l *nGCPLogger) Log(lMsg *logger.Message) error {
logLine := lMsg.Line
logLine := strings.TrimSpace(string(lMsg.Line))
ts := lMsg.Timestamp
logger.PutMessage(lMsg)

driverErr := &driverError{}

if len(logLine) > 0 {
var payload any
Expand All @@ -290,32 +299,42 @@ func (l *nGCPLogger) Log(lMsg *logger.Message) error {

if l.extractJsonMessage && logLine[0] == '{' && logLine[len(logLine)-1] == '}' {
var m map[string]any
err := json.Unmarshal(logLine, &m)
err := json.Unmarshal([]byte(logLine), &m)
if err != nil {
payload = fmt.Sprintf("Error parsing JSON: %s", string(logLine))
entry.Severity = logging.Critical
payload = fmt.Sprintf("Error parsing JSON: %s", logLine)
entry.Severity = l.internalErrorSeverity
} else {
entry.Severity = l.extractSeverityFromPayload(m)
l.excludeTimestampFromPayload(m)
l.extractMsgFromPayload(m)
m["instance"] = l.instance
m["container"] = l.container
l.extractGcpFromPayload(m, &entry)
l.extractCaddyFromPayload(m, &entry)
l.extractGcpFromPayload(m, &entry, driverErr)
l.extractCaddyFromPayload(m, &entry, driverErr)

var receivedDriverErr *nGCPError
if errors.As(lMsg.Err, &receivedDriverErr) && receivedDriverErr != nil {
// Replace original message and error with driver error
delete(m, "message")
delete(m, "error")
m["ngcplogs-error"] = receivedDriverErr
entry.Severity = l.internalErrorSeverity
entry.Timestamp = receivedDriverErr.ts
}
payload = m
}
} else {
payload = dockerLogEntry{
Instance: l.instance,
Container: l.container,
Message: string(logLine),
Message: logLine,
}
}

entry.Payload = payload
l.logger.Log(entry)
}
return nil
return driverErr.Get()
}

func (l *nGCPLogger) extractSeverityFromPayload(m map[string]any) logging.Severity {
Expand Down Expand Up @@ -356,7 +375,6 @@ func (l *nGCPLogger) excludeTimestampFromPayload(m map[string]any) {
}

func (l *nGCPLogger) extractMsgFromPayload(m map[string]any) {

if l.extractMsg {
if msg, exists := m["msg"]; exists {
m["message"] = msg
Expand All @@ -365,63 +383,69 @@ func (l *nGCPLogger) extractMsgFromPayload(m map[string]any) {
}
}

func assertOrLog[T any](val any) T {
func castOrSetDriverErr[T any](val any, driverErr *driverError) T {
var v T
var errMsg string
if val == nil {
_, file, line, ok := runtime.Caller(1)
if !ok {
file = "unknown"
}
slog.Error("unexpected nil value", "file", file, "line", line)
errMsg = fmt.Sprintf("unexpected nil value")
} else {
var ok bool
v, ok = val.(T)
if !ok {
_, file, line, ok := runtime.Caller(1)
if !ok {
file = "unknown"
}
slog.Error("unexpected type", "want", reflect.TypeOf(v).String(), "got", reflect.TypeOf(val).String(), "file", file, "line", line)
errMsg = fmt.Sprintf("unexpected type, wanted %q and got %q", reflect.TypeOf(v).String(), reflect.TypeOf(val).String())
}
}
if errMsg != "" {
_, file, line, ok := runtime.Caller(1)
if !ok {
file = "unknown"
}
driverErr.Set(&nGCPError{
File: file,
Line: line,
ts: time.Now(),
Msg: errMsg,
})
}
return v
}

func (l *nGCPLogger) extractGcpFromPayload(m map[string]any, entry *logging.Entry) {

func (l *nGCPLogger) extractGcpFromPayload(m map[string]any, entry *logging.Entry, driverErr *driverError) {
if l.extractGcp {
if val, exists := m["logging.googleapis.com/sourceLocation"]; exists {
v := assertOrLog[map[string]any](val)
entry.SourceLocation = &loggingpb.LogEntrySourceLocation{
File: assertOrLog[string](v["file"]),
Line: int64(assertOrLog[float64](v["line"])),
Function: assertOrLog[string](v["function"]),
if sourceLocation, exists := m["logging.googleapis.com/sourceLocation"]; exists {
sourceLocationMap := castOrSetDriverErr[map[string]any](sourceLocation, driverErr)
if sourceLocationMap != nil {
entry.SourceLocation = &loggingpb.LogEntrySourceLocation{
File: castOrSetDriverErr[string](sourceLocationMap["file"], driverErr),
Line: int64(castOrSetDriverErr[float64](sourceLocationMap["line"], driverErr)),
Function: castOrSetDriverErr[string](sourceLocationMap["function"], driverErr),
}
}
delete(m, "logging.googleapis.com/sourceLocation")
}
if val, exists := m["logging.googleapis.com/trace"]; exists {
entry.Trace = assertOrLog[string](val)
entry.Trace = castOrSetDriverErr[string](val, driverErr)
delete(m, "logging.googleapis.com/trace")
}
if val, exists := m["logging.googleapis.com/spanId"]; exists {
entry.SpanID = assertOrLog[string](val)
entry.SpanID = castOrSetDriverErr[string](val, driverErr)
delete(m, "logging.googleapis.com/spanId")
}
if val, exists := m["logging.googleapis.com/trace_sampled"]; exists {
entry.TraceSampled = assertOrLog[bool](val)
entry.TraceSampled = castOrSetDriverErr[bool](val, driverErr)
delete(m, "logging.googleapis.com/trace_sampled")
}
if val, exists := m["logging.googleapis.com/labels"]; exists {
v := assertOrLog[map[string]any](val)
for k, v := range v {
entry.Labels[k] = assertOrLog[string](v)
if labels, exists := m["logging.googleapis.com/labels"]; exists {
labelsMap := castOrSetDriverErr[map[string]any](labels, driverErr)
for k, v := range labelsMap {
entry.Labels[k] = castOrSetDriverErr[string](v, driverErr)
}
delete(m, "logging.googleapis.com/labels")
}
}
}

func (l *nGCPLogger) extractCaddyFromPayload(m map[string]any, entry *logging.Entry) {
func (l *nGCPLogger) extractCaddyFromPayload(m map[string]any, entry *logging.Entry, driverErr *driverError) {

if l.extractCaddy {
if val, exists := m["request"]; exists {
Expand All @@ -430,39 +454,39 @@ func (l *nGCPLogger) extractCaddyFromPayload(m map[string]any, entry *logging.En
Header: make(http.Header),
},
}
v := assertOrLog[map[string]any](val)
hr.Request.Method = assertOrLog[string](v["method"])
_, isTLS := v["tls"]
requestMap := castOrSetDriverErr[map[string]any](val, driverErr)
hr.Request.Method = castOrSetDriverErr[string](requestMap["method"], driverErr)
_, isTLS := requestMap["tls"]
var h = "http"
if isTLS {
h = "https"
}
hr.Request.URL = &url.URL{
Scheme: h,
Host: assertOrLog[string](v["host"]),
RawPath: assertOrLog[string](v["uri"]),
Path: assertOrLog[string](v["uri"]),
Host: castOrSetDriverErr[string](requestMap["host"], driverErr),
RawPath: castOrSetDriverErr[string](requestMap["uri"], driverErr),
Path: castOrSetDriverErr[string](requestMap["uri"], driverErr),
}
if t, ok := m["bytes_read"]; ok {
hr.RequestSize = int64(assertOrLog[float64](t))
hr.RequestSize = int64(castOrSetDriverErr[float64](t, driverErr))
}
if t, ok := m["status"]; ok {
hr.Status = int(assertOrLog[float64](t))
hr.Status = int(castOrSetDriverErr[float64](t, driverErr))
}
if t, ok := m["size"]; ok {
hr.ResponseSize = int64(assertOrLog[float64](t))
hr.ResponseSize = int64(castOrSetDriverErr[float64](t, driverErr))
}
if t, ok := m["duration"]; ok {
hr.Latency = time.Duration(assertOrLog[float64](t) * float64(time.Second))
hr.Latency = time.Duration(castOrSetDriverErr[float64](t, driverErr) * float64(time.Second))
}
hr.Request.Proto = assertOrLog[string](v["proto"])
hr.RemoteIP = assertOrLog[string](v["remote_ip"]) + ":" + assertOrLog[string](v["remote_port"])
hr.Request.Proto = castOrSetDriverErr[string](requestMap["proto"], driverErr)
hr.RemoteIP = castOrSetDriverErr[string](requestMap["remote_ip"], driverErr) + ":" + castOrSetDriverErr[string](requestMap["remote_port"], driverErr)

if t, ok := v["headers"]; ok {
headers := assertOrLog[map[string]any](t)
if t, ok := requestMap["headers"]; ok {
headers := castOrSetDriverErr[map[string]any](t, driverErr)
for h, v := range headers {
for _, s := range assertOrLog[[]any](v) {
hr.Request.Header.Add(h, assertOrLog[string](s))
for _, s := range castOrSetDriverErr[[]any](v, driverErr) {
hr.Request.Header.Add(h, castOrSetDriverErr[string](s, driverErr))
}
}
}
Expand Down