Mqtt
Declarative MQTT client topic interfaces with compile-time proxy generation. Publish and subscribe boundaries map to R3 Observable<T> or IObservable<T>.
Packages
| Package | Return types |
|---|---|
Observables.Mqtt.R3 | R3 Observable<T>; publish → Observable<Unit> |
Observables.Mqtt.Reactive | IObservable<T>, IObservable<Unit> |
Both include the Observables.Mqtt runtime (MqttService, MqttObservable bridges) and the matching Roslyn analyzer.
Both packages ship on nuget.org from 0.1.0-preview4 (same model as Events/RestAPI/SignalR).
Also reference MQTTnet 4.3.7.1207 (4.x line) and R3 or System.Reactive in your app. Use the same major version as the meta-package — do not mix MQTTnet 5.x with Observables.Mqtt until a future release documents support.
Why MQTTnet 4.x (not 5.x)
Observables.Mqtt is built and tested against MQTTnet 4.3.7.1207. This is the library major version (NuGet package MQTTnet), not the same thing as the MQTT wire protocol version (3.1.1 vs 5.0).
| Reason | Explanation |
|---|---|
| Target frameworks | Observables.Mqtt.R3 / .Reactive include a netstandard2.0 build. MQTTnet 5 targets .NET 8+ only and drops netstandard. |
| Public API alignment | The runtime uses 4.x types such as IMqttClient, MqttFactory, and ApplicationMessageReceivedAsync. MQTTnet 5 removes several interfaces, splits MqttClientFactory / MqttServerFactory, and changes event patterns — a breaking migration for MqttService.For<T>. |
| Feature scope | The first release maps publish / subscribe only (MQTT 3.1.1-style pub/sub). MQTT-5-only features (request/response, user properties, reason codes) and a MQTTnet 5 upgrade are follow-up work in the Observables repo. |
| Broker compatibility | MQTTnet 4 typically negotiates MQTT 3.1.1 by default. Most brokers still support that; you can configure protocol version in your own ConnectAsync options when the broker requires it. Observables does not depend on MQTT-5-only wire features today. |
When Observables adds MQTTnet 5 support, it will be called out in release notes and this page. Until then, pin 4.3.7.1207 (or another 4.x version compatible with your app, matching the transitive dependency from the meta-package). See the MQTTnet upgrading guide for differences between 4.x and 5.x.
Define a topic proxy
using Observables.Mqtt;
using MQTTnet;
using MQTTnet.Client;
using R3;
[Mqtt]
public interface ISensorTopics
{
[MqttSubscribe("sensors/{deviceId}/temperature")]
Observable<double> Temperature { get; }
[MqttPublish("commands/{deviceId}/restart")]
Observable<Unit> Restart(string deviceId);
}
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
await client.ConnectAsync(
new MqttClientOptionsBuilder()
.WithTcpServer("broker.example.com", 1883)
.Build());
var topics = MqttService.For<ISensorTopics>(client);
using var sub = topics.Temperature.Subscribe(t => Console.WriteLine(t));
await topics.Restart("device-42").FirstAsync();Boundary attributes
| Attribute | Member | MQTT client API |
|---|---|---|
[MqttPublish] | Method | PublishAsync → Observable<Unit> (cold, single completion) |
[MqttSubscribe] | Property | SubscribeAsync + ApplicationMessageReceived (hot stream) |
Topic templates use {parameter} placeholders bound to method parameters (MqttTopic.Format). MQTT + and # wildcards stay in the template literal. Subscribe members must be parameterless properties; publish members are methods.
Payload deserialization (net8+) uses System.Text.Json for non-string / non-byte[] types unless you handle raw bytes explicitly.
System.Reactive
Use IObservable<T> return types and Observables.Mqtt.Reactive; entry point remains MqttService.For<T>(client).
Diagnostics
See Diagnostics.
Design notes
Implementation details are documented in the Observables repo: docs/design/mqtt.md.