Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/openmls/openmls/llms.txt

Use this file to discover all available pages before exploring further.

OpenMLS clients need a delivery service (DS) to exchange messages, distribute key packages, and coordinate group operations. This example demonstrates how to build and integrate with a delivery service.

Overview

A delivery service handles:
  • Client registration and authentication
  • Key package storage and distribution
  • Message routing to group members
  • Welcome message delivery
1
Understand the delivery service architecture
2
The DS acts as a message broker between clients:
3
┌─────────┐         ┌─────────────────┐         ┌─────────┐
│ Client A│────────▶│ Delivery Service│────────▶│ Client B│
└─────────┘         └─────────────────┘         └─────────┘
     │                      │                         │
     │  1. Register         │                         │
     │  2. Publish KeyPkg   │                         │
     │                      │  3. Register            │
     │                      │  4. Publish KeyPkg      │
     │  5. Get KeyPkg(B)    │                         │
     │  6. Send Welcome     │                         │
     │                      │  7. Recv Welcome        │
     │  8. Send Message     │                         │
     │                      │  9. Recv Message        │
4
Set up the server dependencies
5
Create a server using Actix Web:
6
[dependencies]
actix-web = "4.0"
tokio = { version = "1.0", features = ["full"] }
openmls = "1.0"
ds-lib = { path = "../ds-lib" }
tls_codec = "0.4"
base64 = "0.21"
futures-util = "0.3"
7
Create the server state
8
Define the delivery service state:
9
use actix_web::{web, App, HttpServer};
use std::collections::HashMap;
use std::sync::Mutex;
use openmls::prelude::*;
use ds_lib::{ClientInfo, GroupMessage, messages::AuthToken};

#[derive(Default, Debug)]
pub struct DsData {
    // Map of client ID to client information
    clients: Mutex<HashMap<Vec<u8>, ClientInfo>>,
    // Map of group ID to current epoch
    groups: Mutex<HashMap<Vec<u8>, u64>>,
}
10
Implement client registration
11
Clients register and publish their initial key packages:
12
use actix_web::{post, web::Payload, Responder};
use futures_util::StreamExt;
use tls_codec::{Deserialize, Serialize};
use ds_lib::messages::{
    RegisterClientRequest,
    RegisterClientSuccessResponse,
};

#[post("/clients/register")]
async fn register_client(
    mut body: Payload,
    data: web::Data<DsData>,
) -> impl Responder {
    // Read request body
    let mut bytes = web::BytesMut::new();
    while let Some(item) = body.next().await {
        match item {
            Ok(chunk) => bytes.extend_from_slice(&chunk),
            Err(_) => return actix_web::HttpResponse::PartialContent().finish(),
        }
    }

    // Deserialize registration request
    let req = match RegisterClientRequest::tls_deserialize(&mut &bytes[..]) {
        Ok(r) => r,
        Err(_) => {
            log::error!("Invalid registration request");
            return actix_web::HttpResponse::BadRequest().finish();
        }
    };

    if req.key_packages.0.is_empty() {
        return actix_web::HttpResponse::BadRequest().finish();
    }

    // Create client info
    let key_packages = req.key_packages.0
        .into_vec()
        .into_iter()
        .map(|(b, kp)| (b.into_vec(), kp))
        .collect();
    let new_client = ClientInfo::new(key_packages);

    log::debug!("Registering client: {:?}", new_client.id());

    // Generate auth token
    let response = RegisterClientSuccessResponse {
        auth_token: new_client.auth_token.clone(),
    };

    // Store client
    let mut clients = data.clients.lock().unwrap();
    if clients.contains_key(new_client.id()) {
        return actix_web::HttpResponse::Conflict().finish();
    }
    clients.insert(new_client.id().to_vec(), new_client);

    actix_web::HttpResponse::Ok()
        .body(response.tls_serialize_detached().unwrap())
}
13
Implement key package distribution
14
Clients fetch key packages to invite new members:
15
use actix_web::get;

#[get("/clients/key_package/{id}")]
async fn consume_key_package(
    path: web::Path<String>,
    data: web::Data<DsData>,
) -> impl Responder {
    let mut clients = data.clients.lock().unwrap();

    // Decode client ID from base64
    let id = match base64::engine::general_purpose::URL_SAFE
        .decode(path.into_inner()) 
    {
        Ok(v) => v,
        Err(_) => return actix_web::HttpResponse::BadRequest().finish(),
    };

    log::debug!("Consuming key package for {:?}", id);

    // Get and consume a key package
    let key_package = match clients.get_mut(&id) {
        Some(client) => match client.consume_kp() {
            Ok(kp) => kp,
            Err(e) => {
                log::debug!("No key packages available: {}", e);
                return actix_web::HttpResponse::NoContent().finish();
            }
        },
        None => return actix_web::HttpResponse::NotFound().finish(),
    };

    actix_web::HttpResponse::Ok()
        .body(key_package.tls_serialize_detached().unwrap())
}
16
Implement message sending
17
Route messages to group members:
18
#[post("/send/message")]
async fn send_message(
    mut body: Payload,
    data: web::Data<DsData>,
) -> impl Responder {
    let mut bytes = web::BytesMut::new();
    while let Some(item) = body.next().await {
        match item {
            Ok(chunk) => bytes.extend_from_slice(&chunk),
            Err(_) => return actix_web::HttpResponse::PartialContent().finish(),
        }
    }

    let group_msg = match GroupMessage::tls_deserialize(&mut &bytes[..]) {
        Ok(msg) => msg,
        Err(_) => return actix_web::HttpResponse::BadRequest().finish(),
    };

    log::debug!("Routing message to {} recipients", group_msg.recipients.len());

    let mut clients = data.clients.lock().unwrap();
    let mut groups = data.groups.lock().unwrap();

    // Extract protocol message for epoch checking
    let protocol_msg: ProtocolMessage = match group_msg.msg.clone().try_into() {
        Ok(msg) => msg,
        Err(_) => return actix_web::HttpResponse::BadRequest().finish(),
    };

    // Prevent epoch rollback attacks
    if protocol_msg.is_handshake_message() {
        let epoch = protocol_msg.epoch().as_u64();
        let group_id = protocol_msg.group_id().as_slice();

        if let Some(&current_epoch) = groups.get(group_id) {
            if current_epoch > epoch {
                log::warn!("Rejecting old epoch {} (current: {})", epoch, current_epoch);
                return actix_web::HttpResponse::Conflict().finish();
            }
            groups.insert(group_id.to_vec(), epoch);
        } else {
            groups.insert(group_id.to_vec(), epoch);
        }
    }

    // Distribute message to all recipients
    for recipient in group_msg.recipients.iter() {
        let client = match clients.get_mut(recipient.as_slice()) {
            Some(c) => c,
            None => {
                log::warn!("Recipient not found");
                return actix_web::HttpResponse::NotFound().finish();
            }
        };
        client.msgs.push(group_msg.msg.clone());
    }

    actix_web::HttpResponse::Ok().finish()
}
19
Implement Welcome message delivery
20
Deliver Welcome messages to new members:
21
#[post("/send/welcome")]
async fn send_welcome(
    mut body: Payload,
    data: web::Data<DsData>,
) -> impl Responder {
    let mut bytes = web::BytesMut::new();
    while let Some(item) = body.next().await {
        match item {
            Ok(chunk) => bytes.extend_from_slice(&chunk),
            Err(_) => return actix_web::HttpResponse::PartialContent().finish(),
        }
    }

    let welcome_msg = match MlsMessageIn::tls_deserialize(&mut &bytes[..]) {
        Ok(msg) => msg,
        Err(_) => return actix_web::HttpResponse::BadRequest().finish(),
    };

    let welcome = match welcome_msg.clone().into_welcome() {
        Ok(w) => w,
        Err(_) => return actix_web::HttpResponse::BadRequest().finish(),
    };

    log::debug!("Delivering welcome message");

    let mut clients = data.clients.lock().unwrap();

    // Find the recipient by matching key package hash
    for secret in welcome.secrets().iter() {
        let key_package_hash = secret.new_member();
        
        for (_client_id, client) in clients.iter_mut() {
            if client.reserved_key_pkg_hash.take(key_package_hash.as_slice()).is_some() {
                client.welcome_queue.push(welcome_msg);
                return actix_web::HttpResponse::Ok().finish();
            }
        }
    }

    actix_web::HttpResponse::NoContent().finish()
}
22
Implement message retrieval
23
Clients poll for new messages:
24
use ds_lib::messages::RecvMessageRequest;
use tls_codec::TlsSliceU16;

#[get("/recv/{id}")]
async fn receive_messages(
    path: web::Path<String>,
    mut body: Payload,
    data: web::Data<DsData>,
) -> impl Responder {
    let mut bytes = web::BytesMut::new();
    while let Some(item) = body.next().await {
        match item {
            Ok(chunk) => bytes.extend_from_slice(&chunk),
            Err(_) => return actix_web::HttpResponse::PartialContent().finish(),
        }
    }

    let mut clients = data.clients.lock().unwrap();

    let id = match base64::engine::general_purpose::URL_SAFE
        .decode(path.into_inner()) 
    {
        Ok(v) => v,
        Err(_) => return actix_web::HttpResponse::BadRequest().finish(),
    };

    let client = match clients.get_mut(&id) {
        Some(c) => c,
        None => return actix_web::HttpResponse::NotFound().finish(),
    };

    // Authenticate request
    let req = match RecvMessageRequest::tls_deserialize(&mut &bytes[..]) {
        Ok(r) => r,
        Err(_) => return actix_web::HttpResponse::BadRequest().finish(),
    };

    if req.auth_token != client.auth_token {
        return actix_web::HttpResponse::Unauthorized().finish();
    }

    log::debug!("Fetching messages for client");

    // Collect all pending messages
    let mut all_messages: Vec<MlsMessageIn> = Vec::new();
    all_messages.append(&mut client.welcome_queue.drain(..).collect());
    all_messages.append(&mut client.msgs.drain(..).collect());

    match TlsSliceU16(&all_messages).tls_serialize_detached() {
        Ok(bytes) => actix_web::HttpResponse::Ok().body(bytes),
        Err(_) => actix_web::HttpResponse::InternalServerError().finish(),
    }
}
25
Create the server main function
26
Start the delivery service:
27
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    env_logger::init();

    let data = web::Data::new(DsData::default());
    let port = 8080u16;
    let addr = format!("127.0.0.1:{}", port);

    log::info!("Starting delivery service on {}", addr);

    HttpServer::new(move || {
        App::new()
            .app_data(data.clone())
            .service(register_client)
            .service(consume_key_package)
            .service(send_message)
            .service(send_welcome)
            .service(receive_messages)
    })
    .bind(addr)?
    .run()
    .await
}
28
Implement the client backend
29
Create a client library to interact with the DS:
30
use url::Url;
use reqwest::blocking::Client;
use openmls::prelude::*;

pub struct Backend {
    ds_url: Url,
    client: Client,
}

impl Backend {
    pub fn new(ds_url: &str) -> Self {
        Self {
            ds_url: Url::parse(ds_url).unwrap(),
            client: Client::new(),
        }
    }

    pub fn register_client(
        &self,
        key_packages: Vec<(Vec<u8>, KeyPackage)>,
    ) -> Result<AuthToken, String> {
        let mut url = self.ds_url.clone();
        url.set_path("/clients/register");

        let kp_data = ClientKeyPackages(
            key_packages
                .into_iter()
                .map(|(b, kp)| (b.into(), KeyPackageIn::from(kp)))
                .collect::<Vec<_>>()
                .into(),
        );

        let request = RegisterClientRequest { key_packages: kp_data };
        let response = self.client
            .post(url)
            .body(request.tls_serialize_detached().unwrap())
            .send()
            .map_err(|e| e.to_string())?;

        let bytes = response.bytes()
            .map_err(|e| e.to_string())?;

        let response = RegisterClientSuccessResponse::tls_deserialize(
            &mut bytes.as_ref()
        )
        .map_err(|e| e.to_string())?;

        Ok(response.auth_token)
    }

    pub fn consume_key_package(
        &self,
        client_id: &[u8],
    ) -> Result<KeyPackageIn, String> {
        let mut url = self.ds_url.clone();
        let path = format!(
            "/clients/key_package/{}",
            base64::engine::general_purpose::URL_SAFE.encode(client_id)
        );
        url.set_path(&path);

        let response = self.client.get(url)
            .send()
            .map_err(|e| e.to_string())?;

        let bytes = response.bytes()
            .map_err(|e| e.to_string())?;

        KeyPackageIn::tls_deserialize(&mut bytes.as_ref())
            .map_err(|e| e.to_string())
    }

    pub fn send_message(
        &self,
        group_msg: &GroupMessage,
    ) -> Result<(), String> {
        let mut url = self.ds_url.clone();
        url.set_path("/send/message");

        self.client
            .post(url)
            .body(group_msg.tls_serialize_detached().unwrap())
            .send()
            .map_err(|e| e.to_string())?;

        Ok(())
    }

    pub fn receive_messages(
        &self,
        client_id: &[u8],
        auth_token: &AuthToken,
    ) -> Result<Vec<MlsMessageIn>, String> {
        let mut url = self.ds_url.clone();
        let path = format!(
            "/recv/{}",
            base64::engine::general_purpose::URL_SAFE.encode(client_id)
        );
        url.set_path(&path);

        let request = RecvMessageRequest {
            auth_token: auth_token.clone(),
        };

        let response = self.client
            .get(url)
            .body(request.tls_serialize_detached().unwrap())
            .send()
            .map_err(|e| e.to_string())?;

        let bytes = response.bytes()
            .map_err(|e| e.to_string())?;

        let messages = TlsVecU16::<MlsMessageIn>::tls_deserialize(
            &mut bytes.as_ref()
        )
        .map_err(|e| e.to_string())?;

        Ok(messages.into())
    }
}

Complete Client Example

Put it all together with a complete client:
use openmls::prelude::*;
use openmls_rust_crypto::OpenMlsRustCrypto;

fn main() {
    let backend = Backend::new("http://localhost:8080");
    let provider = OpenMlsRustCrypto::default();

    // Create and register Alice
    let mut alice = create_user("Alice", &provider);
    let alice_id = b"Alice".to_vec();
    let alice_kps = vec![alice.create_key_package(&provider)];
    let alice_token = backend.register_client(alice_kps).unwrap();

    // Create and register Bob
    let mut bob = create_user("Bob", &provider);
    let bob_id = b"Bob".to_vec();
    let bob_kps = vec![bob.create_key_package(&provider)];
    let bob_token = backend.register_client(bob_kps).unwrap();

    // Alice creates a group and invites Bob
    alice.create_group(b"chat-group");
    let bob_kp = backend.consume_key_package(&bob_id).unwrap();
    let (commit, welcome) = alice.invite(&bob_kp);
    
    // Send messages through DS
    backend.send_message(&commit).unwrap();
    backend.send_welcome(&welcome).unwrap();

    // Bob receives and processes messages
    let messages = backend.receive_messages(&bob_id, &bob_token).unwrap();
    for msg in messages {
        bob.process_message(msg);
    }

    println!("Successfully established group through delivery service!");
}

Security Considerations

  1. Authentication: Implement proper authentication (OAuth, JWT, etc.)
  2. Rate Limiting: Prevent abuse with rate limits
  3. Encryption in Transit: Use TLS for all connections
  4. Epoch Validation: Prevent rollback attacks by tracking epochs
  5. Message Ordering: Handle out-of-order delivery gracefully

Next Steps

  • Add custom storage for server persistence
  • Implement key rotation with server coordination
  • Scale the delivery service with load balancing and clustering