OpenMLS clients need a delivery service (DS) to exchange messages, distribute key packages, and coordinate group operations. This example demonstrates how to build and integrate with a delivery service.Documentation Index
Fetch the complete documentation index at: https://mintlify.com/openmls/openmls/llms.txt
Use this file to discover all available pages before exploring further.
Overview
A delivery service handles:- Client registration and authentication
- Key package storage and distribution
- Message routing to group members
- Welcome message delivery
┌─────────┐ ┌─────────────────┐ ┌─────────┐
│ Client A│────────▶│ Delivery Service│────────▶│ Client B│
└─────────┘ └─────────────────┘ └─────────┘
│ │ │
│ 1. Register │ │
│ 2. Publish KeyPkg │ │
│ │ 3. Register │
│ │ 4. Publish KeyPkg │
│ 5. Get KeyPkg(B) │ │
│ 6. Send Welcome │ │
│ │ 7. Recv Welcome │
│ 8. Send Message │ │
│ │ 9. Recv Message │
[dependencies]
actix-web = "4.0"
tokio = { version = "1.0", features = ["full"] }
openmls = "1.0"
ds-lib = { path = "../ds-lib" }
tls_codec = "0.4"
base64 = "0.21"
futures-util = "0.3"
use actix_web::{web, App, HttpServer};
use std::collections::HashMap;
use std::sync::Mutex;
use openmls::prelude::*;
use ds_lib::{ClientInfo, GroupMessage, messages::AuthToken};
#[derive(Default, Debug)]
pub struct DsData {
// Map of client ID to client information
clients: Mutex<HashMap<Vec<u8>, ClientInfo>>,
// Map of group ID to current epoch
groups: Mutex<HashMap<Vec<u8>, u64>>,
}
use actix_web::{post, web::Payload, Responder};
use futures_util::StreamExt;
use tls_codec::{Deserialize, Serialize};
use ds_lib::messages::{
RegisterClientRequest,
RegisterClientSuccessResponse,
};
#[post("/clients/register")]
async fn register_client(
mut body: Payload,
data: web::Data<DsData>,
) -> impl Responder {
// Read request body
let mut bytes = web::BytesMut::new();
while let Some(item) = body.next().await {
match item {
Ok(chunk) => bytes.extend_from_slice(&chunk),
Err(_) => return actix_web::HttpResponse::PartialContent().finish(),
}
}
// Deserialize registration request
let req = match RegisterClientRequest::tls_deserialize(&mut &bytes[..]) {
Ok(r) => r,
Err(_) => {
log::error!("Invalid registration request");
return actix_web::HttpResponse::BadRequest().finish();
}
};
if req.key_packages.0.is_empty() {
return actix_web::HttpResponse::BadRequest().finish();
}
// Create client info
let key_packages = req.key_packages.0
.into_vec()
.into_iter()
.map(|(b, kp)| (b.into_vec(), kp))
.collect();
let new_client = ClientInfo::new(key_packages);
log::debug!("Registering client: {:?}", new_client.id());
// Generate auth token
let response = RegisterClientSuccessResponse {
auth_token: new_client.auth_token.clone(),
};
// Store client
let mut clients = data.clients.lock().unwrap();
if clients.contains_key(new_client.id()) {
return actix_web::HttpResponse::Conflict().finish();
}
clients.insert(new_client.id().to_vec(), new_client);
actix_web::HttpResponse::Ok()
.body(response.tls_serialize_detached().unwrap())
}
use actix_web::get;
#[get("/clients/key_package/{id}")]
async fn consume_key_package(
path: web::Path<String>,
data: web::Data<DsData>,
) -> impl Responder {
let mut clients = data.clients.lock().unwrap();
// Decode client ID from base64
let id = match base64::engine::general_purpose::URL_SAFE
.decode(path.into_inner())
{
Ok(v) => v,
Err(_) => return actix_web::HttpResponse::BadRequest().finish(),
};
log::debug!("Consuming key package for {:?}", id);
// Get and consume a key package
let key_package = match clients.get_mut(&id) {
Some(client) => match client.consume_kp() {
Ok(kp) => kp,
Err(e) => {
log::debug!("No key packages available: {}", e);
return actix_web::HttpResponse::NoContent().finish();
}
},
None => return actix_web::HttpResponse::NotFound().finish(),
};
actix_web::HttpResponse::Ok()
.body(key_package.tls_serialize_detached().unwrap())
}
#[post("/send/message")]
async fn send_message(
mut body: Payload,
data: web::Data<DsData>,
) -> impl Responder {
let mut bytes = web::BytesMut::new();
while let Some(item) = body.next().await {
match item {
Ok(chunk) => bytes.extend_from_slice(&chunk),
Err(_) => return actix_web::HttpResponse::PartialContent().finish(),
}
}
let group_msg = match GroupMessage::tls_deserialize(&mut &bytes[..]) {
Ok(msg) => msg,
Err(_) => return actix_web::HttpResponse::BadRequest().finish(),
};
log::debug!("Routing message to {} recipients", group_msg.recipients.len());
let mut clients = data.clients.lock().unwrap();
let mut groups = data.groups.lock().unwrap();
// Extract protocol message for epoch checking
let protocol_msg: ProtocolMessage = match group_msg.msg.clone().try_into() {
Ok(msg) => msg,
Err(_) => return actix_web::HttpResponse::BadRequest().finish(),
};
// Prevent epoch rollback attacks
if protocol_msg.is_handshake_message() {
let epoch = protocol_msg.epoch().as_u64();
let group_id = protocol_msg.group_id().as_slice();
if let Some(¤t_epoch) = groups.get(group_id) {
if current_epoch > epoch {
log::warn!("Rejecting old epoch {} (current: {})", epoch, current_epoch);
return actix_web::HttpResponse::Conflict().finish();
}
groups.insert(group_id.to_vec(), epoch);
} else {
groups.insert(group_id.to_vec(), epoch);
}
}
// Distribute message to all recipients
for recipient in group_msg.recipients.iter() {
let client = match clients.get_mut(recipient.as_slice()) {
Some(c) => c,
None => {
log::warn!("Recipient not found");
return actix_web::HttpResponse::NotFound().finish();
}
};
client.msgs.push(group_msg.msg.clone());
}
actix_web::HttpResponse::Ok().finish()
}
#[post("/send/welcome")]
async fn send_welcome(
mut body: Payload,
data: web::Data<DsData>,
) -> impl Responder {
let mut bytes = web::BytesMut::new();
while let Some(item) = body.next().await {
match item {
Ok(chunk) => bytes.extend_from_slice(&chunk),
Err(_) => return actix_web::HttpResponse::PartialContent().finish(),
}
}
let welcome_msg = match MlsMessageIn::tls_deserialize(&mut &bytes[..]) {
Ok(msg) => msg,
Err(_) => return actix_web::HttpResponse::BadRequest().finish(),
};
let welcome = match welcome_msg.clone().into_welcome() {
Ok(w) => w,
Err(_) => return actix_web::HttpResponse::BadRequest().finish(),
};
log::debug!("Delivering welcome message");
let mut clients = data.clients.lock().unwrap();
// Find the recipient by matching key package hash
for secret in welcome.secrets().iter() {
let key_package_hash = secret.new_member();
for (_client_id, client) in clients.iter_mut() {
if client.reserved_key_pkg_hash.take(key_package_hash.as_slice()).is_some() {
client.welcome_queue.push(welcome_msg);
return actix_web::HttpResponse::Ok().finish();
}
}
}
actix_web::HttpResponse::NoContent().finish()
}
use ds_lib::messages::RecvMessageRequest;
use tls_codec::TlsSliceU16;
#[get("/recv/{id}")]
async fn receive_messages(
path: web::Path<String>,
mut body: Payload,
data: web::Data<DsData>,
) -> impl Responder {
let mut bytes = web::BytesMut::new();
while let Some(item) = body.next().await {
match item {
Ok(chunk) => bytes.extend_from_slice(&chunk),
Err(_) => return actix_web::HttpResponse::PartialContent().finish(),
}
}
let mut clients = data.clients.lock().unwrap();
let id = match base64::engine::general_purpose::URL_SAFE
.decode(path.into_inner())
{
Ok(v) => v,
Err(_) => return actix_web::HttpResponse::BadRequest().finish(),
};
let client = match clients.get_mut(&id) {
Some(c) => c,
None => return actix_web::HttpResponse::NotFound().finish(),
};
// Authenticate request
let req = match RecvMessageRequest::tls_deserialize(&mut &bytes[..]) {
Ok(r) => r,
Err(_) => return actix_web::HttpResponse::BadRequest().finish(),
};
if req.auth_token != client.auth_token {
return actix_web::HttpResponse::Unauthorized().finish();
}
log::debug!("Fetching messages for client");
// Collect all pending messages
let mut all_messages: Vec<MlsMessageIn> = Vec::new();
all_messages.append(&mut client.welcome_queue.drain(..).collect());
all_messages.append(&mut client.msgs.drain(..).collect());
match TlsSliceU16(&all_messages).tls_serialize_detached() {
Ok(bytes) => actix_web::HttpResponse::Ok().body(bytes),
Err(_) => actix_web::HttpResponse::InternalServerError().finish(),
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::init();
let data = web::Data::new(DsData::default());
let port = 8080u16;
let addr = format!("127.0.0.1:{}", port);
log::info!("Starting delivery service on {}", addr);
HttpServer::new(move || {
App::new()
.app_data(data.clone())
.service(register_client)
.service(consume_key_package)
.service(send_message)
.service(send_welcome)
.service(receive_messages)
})
.bind(addr)?
.run()
.await
}
use url::Url;
use reqwest::blocking::Client;
use openmls::prelude::*;
pub struct Backend {
ds_url: Url,
client: Client,
}
impl Backend {
pub fn new(ds_url: &str) -> Self {
Self {
ds_url: Url::parse(ds_url).unwrap(),
client: Client::new(),
}
}
pub fn register_client(
&self,
key_packages: Vec<(Vec<u8>, KeyPackage)>,
) -> Result<AuthToken, String> {
let mut url = self.ds_url.clone();
url.set_path("/clients/register");
let kp_data = ClientKeyPackages(
key_packages
.into_iter()
.map(|(b, kp)| (b.into(), KeyPackageIn::from(kp)))
.collect::<Vec<_>>()
.into(),
);
let request = RegisterClientRequest { key_packages: kp_data };
let response = self.client
.post(url)
.body(request.tls_serialize_detached().unwrap())
.send()
.map_err(|e| e.to_string())?;
let bytes = response.bytes()
.map_err(|e| e.to_string())?;
let response = RegisterClientSuccessResponse::tls_deserialize(
&mut bytes.as_ref()
)
.map_err(|e| e.to_string())?;
Ok(response.auth_token)
}
pub fn consume_key_package(
&self,
client_id: &[u8],
) -> Result<KeyPackageIn, String> {
let mut url = self.ds_url.clone();
let path = format!(
"/clients/key_package/{}",
base64::engine::general_purpose::URL_SAFE.encode(client_id)
);
url.set_path(&path);
let response = self.client.get(url)
.send()
.map_err(|e| e.to_string())?;
let bytes = response.bytes()
.map_err(|e| e.to_string())?;
KeyPackageIn::tls_deserialize(&mut bytes.as_ref())
.map_err(|e| e.to_string())
}
pub fn send_message(
&self,
group_msg: &GroupMessage,
) -> Result<(), String> {
let mut url = self.ds_url.clone();
url.set_path("/send/message");
self.client
.post(url)
.body(group_msg.tls_serialize_detached().unwrap())
.send()
.map_err(|e| e.to_string())?;
Ok(())
}
pub fn receive_messages(
&self,
client_id: &[u8],
auth_token: &AuthToken,
) -> Result<Vec<MlsMessageIn>, String> {
let mut url = self.ds_url.clone();
let path = format!(
"/recv/{}",
base64::engine::general_purpose::URL_SAFE.encode(client_id)
);
url.set_path(&path);
let request = RecvMessageRequest {
auth_token: auth_token.clone(),
};
let response = self.client
.get(url)
.body(request.tls_serialize_detached().unwrap())
.send()
.map_err(|e| e.to_string())?;
let bytes = response.bytes()
.map_err(|e| e.to_string())?;
let messages = TlsVecU16::<MlsMessageIn>::tls_deserialize(
&mut bytes.as_ref()
)
.map_err(|e| e.to_string())?;
Ok(messages.into())
}
}
Complete Client Example
Put it all together with a complete client:Security Considerations
- Authentication: Implement proper authentication (OAuth, JWT, etc.)
- Rate Limiting: Prevent abuse with rate limits
- Encryption in Transit: Use TLS for all connections
- Epoch Validation: Prevent rollback attacks by tracking epochs
- Message Ordering: Handle out-of-order delivery gracefully
Next Steps
- Add custom storage for server persistence
- Implement key rotation with server coordination
- Scale the delivery service with load balancing and clustering