From 8092d8a135eb7c0e7c7c520be935a0bdd2fc9455 Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Thu, 3 Apr 2025 16:54:17 +0100 Subject: [PATCH 1/2] PoC proxy connection for MQTT bridge --- Cargo.lock | 13 ++++++ .../src/tedge_toml/models/http_or_s.rs | 40 +++++++++++++++++++ .../tedge_config/src/tedge_toml/models/mod.rs | 1 + .../src/tedge_toml/tedge_config.rs | 16 ++++++++ .../tedge_toml/tedge_config/append_remove.rs | 8 +++- crates/core/tedge_mapper/src/c8y/mapper.rs | 29 ++++++++++++++ .../extensions/tedge_mqtt_bridge/Cargo.toml | 2 +- 7 files changed, 106 insertions(+), 3 deletions(-) create mode 100644 crates/common/tedge_config/src/tedge_toml/models/http_or_s.rs diff --git a/Cargo.lock b/Cargo.lock index 65adf996213..a121a96c045 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -268,6 +268,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-http-proxy" +version = "1.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29faa5d4d308266048bd7505ba55484315a890102f9345b9ff4b87de64201592" +dependencies = [ + "base64 0.13.1", + "httparse", + "thiserror 1.0.69", + "tokio", +] + [[package]] name = "async-tempfile" version = "0.7.0" @@ -3677,6 +3689,7 @@ name = "rumqttc" version = "0.24.0" source = "git+https://github.com/jarhodes314/rumqtt?rev=8c489faf6af910956c97b55587ff3ecb2ac4e96f#8c489faf6af910956c97b55587ff3ecb2ac4e96f" dependencies = [ + "async-http-proxy", "bytes", "fixedbitset", "flume", diff --git a/crates/common/tedge_config/src/tedge_toml/models/http_or_s.rs b/crates/common/tedge_config/src/tedge_toml/models/http_or_s.rs new file mode 100644 index 00000000000..b4e32e6ba70 --- /dev/null +++ b/crates/common/tedge_config/src/tedge_toml/models/http_or_s.rs @@ -0,0 +1,40 @@ +use std::fmt::Display; +use std::fmt::Formatter; +use std::str::FromStr; + +/// A flag that can be HTTP or HTTPS +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, doku::Document)] +pub enum HttpOrS { + Http, + Https, +} + +#[derive(thiserror::Error, Debug)] +#[error("Failed to parse flag: {input}. Supported values are: HTTP, HTTPS")] +pub struct InvalidScheme { + input: String, +} + +impl FromStr for HttpOrS { + type Err = InvalidScheme; + + fn from_str(input: &str) -> Result { + match input.to_lowercase().as_str() { + "http" => Ok(Self::Http), + "https" => Ok(Self::Https), + _ => Err(Self::Err { + input: input.to_string(), + }), + } + } +} + +impl Display for HttpOrS { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let output = match self { + Self::Http => "HTTP", + Self::Https => "HTTPS", + }; + output.fmt(f) + } +} diff --git a/crates/common/tedge_config/src/tedge_toml/models/mod.rs b/crates/common/tedge_config/src/tedge_toml/models/mod.rs index db03e33c4b2..8faf31a4aff 100644 --- a/crates/common/tedge_config/src/tedge_toml/models/mod.rs +++ b/crates/common/tedge_config/src/tedge_toml/models/mod.rs @@ -6,6 +6,7 @@ pub mod connect_url; pub mod cryptoki; pub mod flag; pub mod host_port; +pub mod http_or_s; pub mod ipaddress; pub mod path; pub mod port; diff --git a/crates/common/tedge_config/src/tedge_toml/tedge_config.rs b/crates/common/tedge_config/src/tedge_toml/tedge_config.rs index 632c0fef1fd..ad504821c70 100644 --- a/crates/common/tedge_config/src/tedge_toml/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_toml/tedge_config.rs @@ -5,6 +5,7 @@ mod append_remove; pub use append_remove::AppendRemoveItem; use super::models::auth_method::AuthMethod; +use super::models::http_or_s::HttpOrS; use super::models::timestamp::TimeFormat; use super::models::AptConfig; use super::models::AutoFlag; @@ -354,6 +355,21 @@ define_tedge_config! { /// The amount of time after which the bridge should send a ping if no other traffic has occurred #[tedge_config(example = "60s", default(from_str = "60s"))] keepalive_interval: SecondsOrHumanTime, + + proxy: { + /// The address (host:port) of an HTTP CONNECT proxy to use when connecting to Cumulocity + address: HostPort<8000>, + + /// The username for the proxy connection to Cumulocity's MQTT broker + username: String, + + /// The password for the proxy connection to Cumulocity's MQTT broker + password: String, + + #[tedge_config(rename = "type", default(variable = "HttpOrS::Https"), example = "HTTPS")] + /// The type of the proxy connection to use, either `HTTP` or `HTTPS` + ty: HttpOrS, + } }, entity_store: { diff --git a/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs b/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs index 34af0c40af5..27846ff20bd 100644 --- a/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs +++ b/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs @@ -1,3 +1,5 @@ +use crate::models::http_or_s::HttpOrS; + use super::*; #[diagnostic::on_unimplemented( @@ -13,7 +15,7 @@ pub trait AppendRemoveItem { } macro_rules! impl_append_remove_for_single_value { - ($($type:ty),*) => { + ($($type:ty),* $(,)?) => { $( impl AppendRemoveItem for $type { type Item = $type; @@ -56,7 +58,9 @@ impl_append_remove_for_single_value!( AptConfig, MqttPayloadLimit, AuthMethod, - Cryptoki + Cryptoki, + HostPort<8000>, + HttpOrS, ); impl AppendRemoveItem for TemplatesSet { diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 238f293dc31..42d4b0ca4ec 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -12,14 +12,21 @@ use c8y_mapper_ext::config::C8yMapperConfig; use c8y_mapper_ext::converter::CumulocityConverter; use mqtt_channel::Config; use std::borrow::Cow; +use std::sync::Arc; use tedge_api::entity::EntityExternalId; use tedge_api::mqtt_topics::EntityTopicId; +use tedge_config::all_or_nothing; +use tedge_config::models::http_or_s::HttpOrS; use tedge_config::tedge_toml::ProfileName; use tedge_config::TEdgeConfig; use tedge_downloader_ext::DownloaderActor; use tedge_file_system_ext::FsWatchActorBuilder; use tedge_http_ext::HttpActor; use tedge_mqtt_bridge::rumqttc::LastWill; +use tedge_mqtt_bridge::rumqttc::Proxy; +use tedge_mqtt_bridge::rumqttc::ProxyAuth; +use tedge_mqtt_bridge::rumqttc::ProxyType; +use tedge_mqtt_bridge::rumqttc::TlsConfiguration; use tedge_mqtt_bridge::rumqttc::Transport; use tedge_mqtt_bridge::use_credentials; use tedge_mqtt_bridge::BridgeConfig; @@ -224,6 +231,28 @@ impl TEdgeComponent for CumulocityMapper { }); cloud_config.set_keep_alive(c8y_config.bridge.keepalive_interval.duration()); + let rustls_config = tedge_config.cloud_client_tls_config(); + let proxy_config = &c8y_config.bridge.proxy; + if let Some(address) = proxy_config.address.or_none() { + let credentials = + all_or_nothing((proxy_config.username.clone(), proxy_config.password.clone())) + .map_err(|e| anyhow::anyhow!(e))?; + cloud_config.set_proxy(Proxy { + addr: address.host().to_string(), + port: address.port().0, + auth: match credentials { + Some((username, password)) => ProxyAuth::Basic { username, password }, + None => ProxyAuth::None, + }, + ty: match c8y_config.bridge.proxy.ty { + HttpOrS::Http => ProxyType::Http, + HttpOrS::Https => { + ProxyType::Https(TlsConfiguration::Rustls(Arc::new(rustls_config))) + } + }, + }); + } + runtime .spawn( MqttBridgeActorBuilder::new( diff --git a/crates/extensions/tedge_mqtt_bridge/Cargo.toml b/crates/extensions/tedge_mqtt_bridge/Cargo.toml index 615c08175fa..c82278cf398 100644 --- a/crates/extensions/tedge_mqtt_bridge/Cargo.toml +++ b/crates/extensions/tedge_mqtt_bridge/Cargo.toml @@ -22,7 +22,7 @@ certificate = { workspace = true } futures = { workspace = true } mqtt_channel = { workspace = true } mutants = { workspace = true } -rumqttc = { workspace = true } +rumqttc = { workspace = true, features = ["proxy"] } tedge_actors = { workspace = true } tedge_config = { workspace = true } thiserror = { workspace = true } From 21a12692b1ce02d302cb3ce75e365e92f90d36b4 Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Fri, 4 Apr 2025 14:12:18 +0100 Subject: [PATCH 2/2] Add MQTT over Websocket support Signed-off-by: James Rhodes --- Cargo.lock | 4 ++ .../src/tedge_toml/models/auto.rs | 4 +- .../src/tedge_toml/models/http_or_s.rs | 4 +- .../tedge_config/src/tedge_toml/models/mod.rs | 1 + .../src/tedge_toml/models/mqtt_protocol.rs | 42 +++++++++++++++++++ .../src/tedge_toml/tedge_config.rs | 5 +++ .../tedge_toml/tedge_config/append_remove.rs | 1 + crates/core/tedge_mapper/src/c8y/mapper.rs | 14 ++++++- .../extensions/tedge_mqtt_bridge/Cargo.toml | 2 +- .../tedge_mqtt_bridge/src/config.rs | 9 +++- 10 files changed, 81 insertions(+), 5 deletions(-) create mode 100644 crates/common/tedge_config/src/tedge_toml/models/mqtt_protocol.rs diff --git a/Cargo.lock b/Cargo.lock index a121a96c045..2ff8f91babd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3690,10 +3690,12 @@ version = "0.24.0" source = "git+https://github.com/jarhodes314/rumqtt?rev=8c489faf6af910956c97b55587ff3ecb2ac4e96f#8c489faf6af910956c97b55587ff3ecb2ac4e96f" dependencies = [ "async-http-proxy", + "async-tungstenite 0.28.0", "bytes", "fixedbitset", "flume", "futures-util", + "http 1.2.0", "log", "rustls-native-certs", "rustls-pemfile 2.2.0", @@ -3703,6 +3705,7 @@ dependencies = [ "tokio-rustls 0.26.1", "tokio-stream", "tokio-util", + "ws_stream_tungstenite 0.14.0", ] [[package]] @@ -6037,6 +6040,7 @@ dependencies = [ "futures-util", "pharos", "rustc_version", + "tokio", "tracing", "tungstenite 0.24.0", ] diff --git a/crates/common/tedge_config/src/tedge_toml/models/auto.rs b/crates/common/tedge_config/src/tedge_toml/models/auto.rs index c19b2691a2d..7c51b7d06ee 100644 --- a/crates/common/tedge_config/src/tedge_toml/models/auto.rs +++ b/crates/common/tedge_config/src/tedge_toml/models/auto.rs @@ -4,7 +4,9 @@ use std::str::FromStr; /// A flag that can be set to auto, /// meaning the system will have to detect the appropriate true/false setting -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, doku::Document)] +#[derive( + Debug, Clone, Copy, serde::Serialize, serde::Deserialize, Eq, PartialEq, doku::Document, +)] pub enum AutoFlag { True, False, diff --git a/crates/common/tedge_config/src/tedge_toml/models/http_or_s.rs b/crates/common/tedge_config/src/tedge_toml/models/http_or_s.rs index b4e32e6ba70..b9351933e36 100644 --- a/crates/common/tedge_config/src/tedge_toml/models/http_or_s.rs +++ b/crates/common/tedge_config/src/tedge_toml/models/http_or_s.rs @@ -3,7 +3,9 @@ use std::fmt::Formatter; use std::str::FromStr; /// A flag that can be HTTP or HTTPS -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, doku::Document)] +#[derive( + Debug, Clone, Copy, serde::Serialize, serde::Deserialize, Eq, PartialEq, doku::Document, +)] pub enum HttpOrS { Http, Https, diff --git a/crates/common/tedge_config/src/tedge_toml/models/mod.rs b/crates/common/tedge_config/src/tedge_toml/models/mod.rs index 8faf31a4aff..5babd39155e 100644 --- a/crates/common/tedge_config/src/tedge_toml/models/mod.rs +++ b/crates/common/tedge_config/src/tedge_toml/models/mod.rs @@ -8,6 +8,7 @@ pub mod flag; pub mod host_port; pub mod http_or_s; pub mod ipaddress; +pub mod mqtt_protocol; pub mod path; pub mod port; pub mod seconds; diff --git a/crates/common/tedge_config/src/tedge_toml/models/mqtt_protocol.rs b/crates/common/tedge_config/src/tedge_toml/models/mqtt_protocol.rs new file mode 100644 index 00000000000..3fc3228db4e --- /dev/null +++ b/crates/common/tedge_config/src/tedge_toml/models/mqtt_protocol.rs @@ -0,0 +1,42 @@ +use std::fmt::Display; +use std::fmt::Formatter; +use std::str::FromStr; + +/// The protocol used to connect to a cloud's MQTT service +#[derive( + Debug, Clone, Copy, serde::Serialize, serde::Deserialize, Eq, PartialEq, doku::Document, +)] +pub enum MqttProtocol { + Tcp, + Websocket, +} + +#[derive(thiserror::Error, Debug)] +#[error("Failed to parse flag: {input}. Supported values are: tcp, ws")] +pub struct InvalidScheme { + input: String, +} + +impl FromStr for MqttProtocol { + type Err = InvalidScheme; + + fn from_str(input: &str) -> Result { + match input.to_lowercase().as_str() { + "tcp" => Ok(Self::Tcp), + "ws" => Ok(Self::Websocket), + _ => Err(Self::Err { + input: input.to_string(), + }), + } + } +} + +impl Display for MqttProtocol { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let output = match self { + Self::Tcp => "tcp", + Self::Websocket => "ws", + }; + output.fmt(f) + } +} diff --git a/crates/common/tedge_config/src/tedge_toml/tedge_config.rs b/crates/common/tedge_config/src/tedge_toml/tedge_config.rs index ad504821c70..5a50ce3c1c2 100644 --- a/crates/common/tedge_config/src/tedge_toml/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_toml/tedge_config.rs @@ -6,6 +6,7 @@ pub use append_remove::AppendRemoveItem; use super::models::auth_method::AuthMethod; use super::models::http_or_s::HttpOrS; +use super::models::mqtt_protocol::MqttProtocol; use super::models::timestamp::TimeFormat; use super::models::AptConfig; use super::models::AutoFlag; @@ -356,6 +357,10 @@ define_tedge_config! { #[tedge_config(example = "60s", default(from_str = "60s"))] keepalive_interval: SecondsOrHumanTime, + /// The protocol to connect the MQTT bridge with (only affects built-in bridge) + #[tedge_config(example = "tcp", default(from_str = "tcp"))] + protocol: MqttProtocol, + proxy: { /// The address (host:port) of an HTTP CONNECT proxy to use when connecting to Cumulocity address: HostPort<8000>, diff --git a/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs b/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs index 27846ff20bd..ee3dc9ce9d1 100644 --- a/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs +++ b/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs @@ -61,6 +61,7 @@ impl_append_remove_for_single_value!( Cryptoki, HostPort<8000>, HttpOrS, + MqttProtocol, ); impl AppendRemoveItem for TemplatesSet { diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 42d4b0ca4ec..3ec048b1e53 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -17,6 +17,7 @@ use tedge_api::entity::EntityExternalId; use tedge_api::mqtt_topics::EntityTopicId; use tedge_config::all_or_nothing; use tedge_config::models::http_or_s::HttpOrS; +use tedge_config::models::mqtt_protocol::MqttProtocol; use tedge_config::tedge_toml::ProfileName; use tedge_config::TEdgeConfig; use tedge_downloader_ext::DownloaderActor; @@ -150,9 +151,15 @@ impl TEdgeComponent for CumulocityMapper { } let c8y = c8y_config.mqtt.or_config_not_set()?; + let mqtt_host = match c8y_config.bridge.protocol { + MqttProtocol::Websocket => format!("wss://{}/mqtt", c8y.host()), + MqttProtocol::Tcp => c8y.host().to_string(), + }; let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new( c8y_config.device.id()?, - c8y.host().to_string(), + mqtt_host, + // The port is ignored in websocket mode, so this only applies + // to MQTT/TCP connections c8y.port().into(), ); // Cumulocity tells us not to not set clean session to false, so don't @@ -160,6 +167,10 @@ impl TEdgeComponent for CumulocityMapper { cloud_config.set_clean_session(true); if use_certificate { + ensure!( + c8y_config.bridge.mqtt_protocol == MqttProtocol::Tcp, + "To use MQTT over websockets, please enable basic authentication" + ); let cloud_broker_auth_config = tedge_config .mqtt_auth_config_cloud_broker(c8y_profile) .expect("error getting cloud broker auth config"); @@ -172,6 +183,7 @@ impl TEdgeComponent for CumulocityMapper { let (username, password) = read_c8y_credentials(&c8y_config.credentials_path)?; use_credentials( &mut cloud_config, + c8y_config.bridge.protocol, &c8y_config.root_cert_path, username, password, diff --git a/crates/extensions/tedge_mqtt_bridge/Cargo.toml b/crates/extensions/tedge_mqtt_bridge/Cargo.toml index c82278cf398..5535e5b32cd 100644 --- a/crates/extensions/tedge_mqtt_bridge/Cargo.toml +++ b/crates/extensions/tedge_mqtt_bridge/Cargo.toml @@ -22,7 +22,7 @@ certificate = { workspace = true } futures = { workspace = true } mqtt_channel = { workspace = true } mutants = { workspace = true } -rumqttc = { workspace = true, features = ["proxy"] } +rumqttc = { workspace = true, features = ["proxy", "websocket"] } tedge_actors = { workspace = true } tedge_config = { workspace = true } thiserror = { workspace = true } diff --git a/crates/extensions/tedge_mqtt_bridge/src/config.rs b/crates/extensions/tedge_mqtt_bridge/src/config.rs index b77aa736c36..6a89465e590 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/config.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/config.rs @@ -8,6 +8,7 @@ use rumqttc::MqttOptions; use rumqttc::Transport; use std::borrow::Cow; use std::path::Path; +use tedge_config::models::mqtt_protocol::MqttProtocol; use tedge_config::tedge_toml::CloudConfig; pub fn use_key_and_cert( @@ -25,12 +26,18 @@ pub fn use_key_and_cert( pub fn use_credentials( config: &mut MqttOptions, + protocol: MqttProtocol, root_cert_path: impl AsRef, username: String, password: String, ) -> anyhow::Result<()> { let tls_config = create_tls_config_without_client_cert(root_cert_path)?; - config.set_transport(Transport::tls_with_config(tls_config.into())); + match protocol { + MqttProtocol::Tcp => config.set_transport(Transport::tls_with_config(tls_config.into())), + MqttProtocol::Websocket => { + config.set_transport(Transport::wss_with_config(tls_config.into())) + } + }; config.set_credentials(username, password); Ok(()) }