diff --git a/README.md b/README.md index bb7d9b9..2a12d9b 100644 --- a/README.md +++ b/README.md @@ -129,18 +129,19 @@ Start all your containers again, and they should be using the new version of the The following [log-opts](https://docs.docker.com/config/containers/logging/configure/#configure-the-default-logging-driver) are available for configuration: -| log-opt | default | description | -|----------------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| extract-json-message | true | Enables unmarshalling JSON messages and sending the jsonPayload as the unmarshalled map. Kind of the whole point of this plugin, but you can disable it so it behaves just like the `gcplogs` plugin if you wish | -| local-logging | false | Enables logging to a local file, so logs can be viewed with the `docker logs` command. If false, the command will show no output | -| extract-severity | true | Extracts the `severity` from JSON logs to set them for the log that will be sent to GCP. It will be removed from the jsonPayload section, since it is set at the root level. Currently the supported severity field names to extract are the following: `severity`, `level` | -| extract-msg | true | Extracts the `msg` field from JSON logs to set the `message` field GCP expects. It will be removed from the jsonPayload section, since it is set at the root level. Fields named msg are produced for example by the golang log/slog package. | -| extract-gcp | false | Extract trace, labels and source location fields if present and formatted for Google cloud logging. This is produced for example by the golang log/slog package with the slogdriver handler | -| extract-caddy | false | Extract trace and HTTP Request from caddy if present and format for Google cloud logging. | -| exclude-timestamp | false | Excludes timestamp fields from the final jsonPayload, since docker sends its own nanosecond precision timestamp for each log. Currently it can remove fields with the following names: `timestamp`, `time`, `ts` | -| sleep-interval | 500 | Milliseconds to sleep when there are no logs to send before checking again. The higher the value, the lower the CPU usage will be | -| credentials-file | | Absolute path to the GCP credentials JSON file to use when authenticating (only necessary when running the plugin outside of GCP) | -| credentials-json | | JSON string with the GCP credentials to use when authenticating (only necessary when running the plugin outside of GCP) | +| log-opt | default | description | +|-------------------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| extract-json-message | true | Enables unmarshalling JSON messages and sending the jsonPayload as the unmarshalled map. Kind of the whole point of this plugin, but you can disable it so it behaves just like the `gcplogs` plugin if you wish | +| local-logging | false | Enables logging to a local file, so logs can be viewed with the `docker logs` command. If false, the command will show no output | +| extract-severity | true | Extracts the `severity` from JSON logs to set them for the log that will be sent to GCP. It will be removed from the jsonPayload section, since it is set at the root level. Currently the supported severity field names to extract are the following: `severity`, `level` | +| extract-msg | true | Extracts the `msg` field from JSON logs to set the `message` field GCP expects. It will be removed from the jsonPayload section, since it is set at the root level. Fields named msg are produced for example by the golang log/slog package. | +| extract-gcp | false | Extract trace, labels and source location fields if present and formatted for Google cloud logging. This is produced for example by the golang log/slog package with the slogdriver handler | +| extract-caddy | false | Extract trace and HTTP Request from caddy if present and format for Google cloud logging. | +| exclude-timestamp | false | Excludes timestamp fields from the final jsonPayload, since docker sends its own nanosecond precision timestamp for each log. Currently it can remove fields with the following names: `timestamp`, `time`, `ts` | +| sleep-interval | 500 | Milliseconds to sleep when there are no logs to send before checking again. The higher the value, the lower the CPU usage will be | +| credentials-file | | Absolute path to the GCP credentials JSON file to use when authenticating (only necessary when running the plugin outside of GCP) | +| credentials-json | | JSON string with the GCP credentials to use when authenticating (only necessary when running the plugin outside of GCP) | +| internal-error-severity | warning | The severity with which internal driver errors should be reported | ### Building locally diff --git a/driver.go b/driver.go index 7798f1e..9249940 100644 --- a/driver.go +++ b/driver.go @@ -119,14 +119,7 @@ func (d *driver) consumeLog(lp *logPair) { } if len(bytes.Fields(buf.Line)) > 0 { - if err := lp.gLogger.Log(createMessageFromBuffer(&buf)); err != nil { - d.sLog.With("id", lp.info.ContainerID, "error", err, "message", buf).Error("error writing log to GCP logger message") - } - if lp.info.Config[localLoggingConfig] == "true" { - if err := lp.jsonl.Log(createMessageFromBuffer(&buf)); err != nil { - d.sLog.With("id", lp.info.ContainerID, "error", err, "message", buf).Error("error writing log message to JSON logger") - } - } + d.log(lp, createMessageFromBuffer(&buf)) } else { time.Sleep(d.sleepInterval) } @@ -135,6 +128,39 @@ func (d *driver) consumeLog(lp *logPair) { } } +func (d *driver) log(lp *logPair, msg *logger.Message) { + if msg.Err != nil { + d.sLog.With("id", lp.info.ContainerID, "error", msg.Err, "message", string(msg.Line)).Error("writing driver err log to GCP logger") + } + + if err := lp.gLogger.Log(msg); err != nil { + d.sLog.With("id", lp.info.ContainerID, "error", err, "message", string(msg.Line)).Error("error writing log to GCP logger message") + d.sLog.Error(fmt.Sprintf("error %+v", err)) + + // handleLoggingError will call with a non-nil err, so we avoid errors from being logged infinitely + if msg.Err == nil { + d.handleLoggingError(lp, msg, err) + } + } + if lp.info.Config[localLoggingConfig] == "true" { + if err := lp.jsonl.Log(msg); err != nil { + d.sLog.With("id", lp.info.ContainerID, "error", err, "message", string(msg.Line)).Error("error writing log message to JSON logger") + } + } +} + +func (d *driver) handleLoggingError(lp *logPair, originalMsg *logger.Message, err error) { + var driverErr *nGCPError + if errors.As(err, &driverErr) && driverErr != nil { + d.log(lp, &logger.Message{ + Line: originalMsg.Line, + Source: originalMsg.Source, + Timestamp: driverErr.ts, + Err: err, + }) + } +} + func createMessageFromBuffer(buf *logdriver.LogEntry) *logger.Message { var msg logger.Message msg.Line = buf.Line diff --git a/error.go b/error.go new file mode 100644 index 0000000..b10e79b --- /dev/null +++ b/error.go @@ -0,0 +1,31 @@ +package main + +import ( + "fmt" + "time" +) + +type nGCPError struct { + Msg string `json:"msg"` + File string `json:"file"` + Line int `json:"line"` + ts time.Time +} + +func (e *nGCPError) Error() string { + return fmt.Sprintf("%s/%d - %s - %s", e.File, e.Line, e.ts, e.Msg) +} + +type driverError struct { + err error +} + +func (e *driverError) Set(err error) { + if e.err == nil { + e.err = err + } +} + +func (e *driverError) Get() error { + return e.err +} diff --git a/ngcplogger.go b/ngcplogger.go index ed205d8..26b73f6 100644 --- a/ngcplogger.go +++ b/ngcplogger.go @@ -5,11 +5,11 @@ import ( "encoding/json" "errors" "fmt" - "log/slog" "net/http" "net/url" "reflect" "runtime" + "strings" "sync" "sync/atomic" "time" @@ -88,6 +88,8 @@ type nGCPLogger struct { extractMsg bool extractGcp bool extractCaddy bool + + internalErrorSeverity logging.Severity } type dockerLogEntry struct { @@ -241,6 +243,12 @@ func New(info logger.Info) (logger.Logger, error) { l.extractCaddy = true } + if internalErrorSeverityStr, isPresent := info.Config["internal-error-severity"]; isPresent { + l.internalErrorSeverity = logging.ParseSeverity(internalErrorSeverityStr) + } else { + l.internalErrorSeverity = logging.Warning + } + if instanceResource != nil { l.instance = instanceResource } @@ -276,9 +284,10 @@ func ValidateLogOpts(cfg map[string]string) error { } func (l *nGCPLogger) Log(lMsg *logger.Message) error { - logLine := lMsg.Line + logLine := strings.TrimSpace(string(lMsg.Line)) ts := lMsg.Timestamp - logger.PutMessage(lMsg) + + driverErr := &driverError{} if len(logLine) > 0 { var payload any @@ -290,32 +299,42 @@ func (l *nGCPLogger) Log(lMsg *logger.Message) error { if l.extractJsonMessage && logLine[0] == '{' && logLine[len(logLine)-1] == '}' { var m map[string]any - err := json.Unmarshal(logLine, &m) + err := json.Unmarshal([]byte(logLine), &m) if err != nil { - payload = fmt.Sprintf("Error parsing JSON: %s", string(logLine)) - entry.Severity = logging.Critical + payload = fmt.Sprintf("Error parsing JSON: %s", logLine) + entry.Severity = l.internalErrorSeverity } else { entry.Severity = l.extractSeverityFromPayload(m) l.excludeTimestampFromPayload(m) l.extractMsgFromPayload(m) m["instance"] = l.instance m["container"] = l.container - l.extractGcpFromPayload(m, &entry) - l.extractCaddyFromPayload(m, &entry) + l.extractGcpFromPayload(m, &entry, driverErr) + l.extractCaddyFromPayload(m, &entry, driverErr) + + var receivedDriverErr *nGCPError + if errors.As(lMsg.Err, &receivedDriverErr) && receivedDriverErr != nil { + // Replace original message and error with driver error + delete(m, "message") + delete(m, "error") + m["ngcplogs-error"] = receivedDriverErr + entry.Severity = l.internalErrorSeverity + entry.Timestamp = receivedDriverErr.ts + } payload = m } } else { payload = dockerLogEntry{ Instance: l.instance, Container: l.container, - Message: string(logLine), + Message: logLine, } } entry.Payload = payload l.logger.Log(entry) } - return nil + return driverErr.Get() } func (l *nGCPLogger) extractSeverityFromPayload(m map[string]any) logging.Severity { @@ -356,7 +375,6 @@ func (l *nGCPLogger) excludeTimestampFromPayload(m map[string]any) { } func (l *nGCPLogger) extractMsgFromPayload(m map[string]any) { - if l.extractMsg { if msg, exists := m["msg"]; exists { m["message"] = msg @@ -365,63 +383,69 @@ func (l *nGCPLogger) extractMsgFromPayload(m map[string]any) { } } -func assertOrLog[T any](val any) T { +func castOrSetDriverErr[T any](val any, driverErr *driverError) T { var v T + var errMsg string if val == nil { - _, file, line, ok := runtime.Caller(1) - if !ok { - file = "unknown" - } - slog.Error("unexpected nil value", "file", file, "line", line) + errMsg = fmt.Sprintf("unexpected nil value") } else { var ok bool v, ok = val.(T) if !ok { - _, file, line, ok := runtime.Caller(1) - if !ok { - file = "unknown" - } - slog.Error("unexpected type", "want", reflect.TypeOf(v).String(), "got", reflect.TypeOf(val).String(), "file", file, "line", line) + errMsg = fmt.Sprintf("unexpected type, wanted %q and got %q", reflect.TypeOf(v).String(), reflect.TypeOf(val).String()) + } + } + if errMsg != "" { + _, file, line, ok := runtime.Caller(1) + if !ok { + file = "unknown" } + driverErr.Set(&nGCPError{ + File: file, + Line: line, + ts: time.Now(), + Msg: errMsg, + }) } return v } -func (l *nGCPLogger) extractGcpFromPayload(m map[string]any, entry *logging.Entry) { - +func (l *nGCPLogger) extractGcpFromPayload(m map[string]any, entry *logging.Entry, driverErr *driverError) { if l.extractGcp { - if val, exists := m["logging.googleapis.com/sourceLocation"]; exists { - v := assertOrLog[map[string]any](val) - entry.SourceLocation = &loggingpb.LogEntrySourceLocation{ - File: assertOrLog[string](v["file"]), - Line: int64(assertOrLog[float64](v["line"])), - Function: assertOrLog[string](v["function"]), + if sourceLocation, exists := m["logging.googleapis.com/sourceLocation"]; exists { + sourceLocationMap := castOrSetDriverErr[map[string]any](sourceLocation, driverErr) + if sourceLocationMap != nil { + entry.SourceLocation = &loggingpb.LogEntrySourceLocation{ + File: castOrSetDriverErr[string](sourceLocationMap["file"], driverErr), + Line: int64(castOrSetDriverErr[float64](sourceLocationMap["line"], driverErr)), + Function: castOrSetDriverErr[string](sourceLocationMap["function"], driverErr), + } } delete(m, "logging.googleapis.com/sourceLocation") } if val, exists := m["logging.googleapis.com/trace"]; exists { - entry.Trace = assertOrLog[string](val) + entry.Trace = castOrSetDriverErr[string](val, driverErr) delete(m, "logging.googleapis.com/trace") } if val, exists := m["logging.googleapis.com/spanId"]; exists { - entry.SpanID = assertOrLog[string](val) + entry.SpanID = castOrSetDriverErr[string](val, driverErr) delete(m, "logging.googleapis.com/spanId") } if val, exists := m["logging.googleapis.com/trace_sampled"]; exists { - entry.TraceSampled = assertOrLog[bool](val) + entry.TraceSampled = castOrSetDriverErr[bool](val, driverErr) delete(m, "logging.googleapis.com/trace_sampled") } - if val, exists := m["logging.googleapis.com/labels"]; exists { - v := assertOrLog[map[string]any](val) - for k, v := range v { - entry.Labels[k] = assertOrLog[string](v) + if labels, exists := m["logging.googleapis.com/labels"]; exists { + labelsMap := castOrSetDriverErr[map[string]any](labels, driverErr) + for k, v := range labelsMap { + entry.Labels[k] = castOrSetDriverErr[string](v, driverErr) } delete(m, "logging.googleapis.com/labels") } } } -func (l *nGCPLogger) extractCaddyFromPayload(m map[string]any, entry *logging.Entry) { +func (l *nGCPLogger) extractCaddyFromPayload(m map[string]any, entry *logging.Entry, driverErr *driverError) { if l.extractCaddy { if val, exists := m["request"]; exists { @@ -430,39 +454,39 @@ func (l *nGCPLogger) extractCaddyFromPayload(m map[string]any, entry *logging.En Header: make(http.Header), }, } - v := assertOrLog[map[string]any](val) - hr.Request.Method = assertOrLog[string](v["method"]) - _, isTLS := v["tls"] + requestMap := castOrSetDriverErr[map[string]any](val, driverErr) + hr.Request.Method = castOrSetDriverErr[string](requestMap["method"], driverErr) + _, isTLS := requestMap["tls"] var h = "http" if isTLS { h = "https" } hr.Request.URL = &url.URL{ Scheme: h, - Host: assertOrLog[string](v["host"]), - RawPath: assertOrLog[string](v["uri"]), - Path: assertOrLog[string](v["uri"]), + Host: castOrSetDriverErr[string](requestMap["host"], driverErr), + RawPath: castOrSetDriverErr[string](requestMap["uri"], driverErr), + Path: castOrSetDriverErr[string](requestMap["uri"], driverErr), } if t, ok := m["bytes_read"]; ok { - hr.RequestSize = int64(assertOrLog[float64](t)) + hr.RequestSize = int64(castOrSetDriverErr[float64](t, driverErr)) } if t, ok := m["status"]; ok { - hr.Status = int(assertOrLog[float64](t)) + hr.Status = int(castOrSetDriverErr[float64](t, driverErr)) } if t, ok := m["size"]; ok { - hr.ResponseSize = int64(assertOrLog[float64](t)) + hr.ResponseSize = int64(castOrSetDriverErr[float64](t, driverErr)) } if t, ok := m["duration"]; ok { - hr.Latency = time.Duration(assertOrLog[float64](t) * float64(time.Second)) + hr.Latency = time.Duration(castOrSetDriverErr[float64](t, driverErr) * float64(time.Second)) } - hr.Request.Proto = assertOrLog[string](v["proto"]) - hr.RemoteIP = assertOrLog[string](v["remote_ip"]) + ":" + assertOrLog[string](v["remote_port"]) + hr.Request.Proto = castOrSetDriverErr[string](requestMap["proto"], driverErr) + hr.RemoteIP = castOrSetDriverErr[string](requestMap["remote_ip"], driverErr) + ":" + castOrSetDriverErr[string](requestMap["remote_port"], driverErr) - if t, ok := v["headers"]; ok { - headers := assertOrLog[map[string]any](t) + if t, ok := requestMap["headers"]; ok { + headers := castOrSetDriverErr[map[string]any](t, driverErr) for h, v := range headers { - for _, s := range assertOrLog[[]any](v) { - hr.Request.Header.Add(h, assertOrLog[string](s)) + for _, s := range castOrSetDriverErr[[]any](v, driverErr) { + hr.Request.Header.Add(h, castOrSetDriverErr[string](s, driverErr)) } } }