This is a Swift 6 client for the RabbitMQ HTTP API
heavily inspired by its Rust counterpart that powers
modern rabbitmqadmin.
This library is reasonably mature.
Before 1.0.0, breaking API changes can and will be introduced.
This library targets RabbitMQ 4.x and 3.13.x.
All older series have reached End of Life.
This library requires Swift 6 and uses strict concurrency checking (swift-6 language mode).
Add this to Package.swift:
.package(url: "https://github.com/rabbitmq/rabbitmq-http-api-client-swift.git", from: "0.1.0")And to your target dependencies:
.product(name: "RabbitMQHTTPAPIClient", package: "rabbitmq-http-api-client-swift")The async client uses Swift's structured concurrency with async/await. All client methods are async throws and designed for concurrent use.
import RabbitMQHTTPAPIClient
let client = Client(
endpoint: "http://localhost:15672/api",
username: "guest",
password: "guest"
)
let nodes = try await client.listNodes()The client uses Foundation.URLSession under the hood and is Sendable, safe to use from multiple concurrent tasks.
The default endpoint is http://localhost:15672/api, username is guest, and password is guest.
let client = Client()
let overview = try await client.overview()Pass RetrySettings to configure retry behavior:
import RabbitMQHTTPAPIClient
let retrySettings = RetrySettings(maxAttempts: 3, delayMs: 500)
let client = Client(
endpoint: "http://localhost:15672/api",
username: "guest",
password: "guest",
retrySettings: retrySettings
)Default retry behavior is maxAttempts: 3 with delayMs: 0.
Pass a custom URLSession for advanced configuration:
var config = URLSessionConfiguration.default
// Timeout for individual request/response (not including connection time)
config.timeoutIntervalForRequest = 30
// Timeout for total resource (connection + request + response)
config.timeoutIntervalForResource = 60
config.waitsForConnectivity = true
let session = URLSession(configuration: config)
let client = Client(
endpoint: "http://localhost:15672/api",
username: "guest",
password: "guest",
session: session
)let overview = try await client.overview()Returns cluster overview including node name, version, statistics, and listeners.
let version = try await client.serverVersion()let nodes = try await client.listNodes()let node = try await client.getNodeInfo("rabbit@hostname")let footprint = try await client.getNodeMemoryFootprint("rabbit@hostname")Returns a per-category memory footprint breakdown, in bytes and as percentages.
let identity = try await client.getClusterName()try await client.setClusterName("my-cluster")Cluster tags are arbitrary key-value pairs for the cluster:
let tags = try await client.getClusterTags()
let newTags = ["environment": "production", "region": "us-east-1"]
try await client.setClusterTags(newTags)
try await client.clearClusterTags()Virtual hosts group and isolate resources.
let vhosts = try await client.listVirtualHosts()let vhost = try await client.getVirtualHost("/")let params = VirtualHostParams(
name: "my-vhost",
description: "Production vhost",
tags: ["production", "critical"],
defaultQueueType: "quorum"
)
try await client.createVirtualHost(params)try await client.deleteVirtualHost("my-vhost", idempotently: false)Set idempotently: true to ignore 404 errors if the vhost doesn't exist.
try await client.enableVirtualHostDeletionProtection("my-vhost")
try await client.disableVirtualHostDeletionProtection("my-vhost")let users = try await client.listUsers()let user = try await client.getUser("my-user")let currentUser = try await client.whoami()let params = UserParams(
name: "new-user",
password: "s3cRe7"
)
try await client.createUser(params)Users can be created with a plaintext password or a password hash. To use a hash:
let params = UserParams(
name: "new-user",
passwordHash: "base64EncodedHash",
hashingAlgorithm: "SHA-256"
)
try await client.createUser(params)try await client.deleteUser("new-user", idempotently: false)try await client.deleteUsers(["user1", "user2", "user3"])let unusedUsers = try await client.listUsersWithoutPermissions()let connections = try await client.listConnections()let vhostConnections = try await client.listConnections(in: "/")let connection = try await client.getConnectionInfo("connection-name")try await client.closeConnection(
"connection-name",
reason: "closing for maintenance",
idempotently: false
)let streamConnections = try await client.listStreamConnections()
let vhostStreamConnections = try await client.listStreamConnections(in: "/")let userConnections = try await client.listUserConnections("username")
try await client.closeUserConnections("username", reason: "session expired")let allQueues = try await client.listQueues()
let vhostQueues = try await client.listQueues(in: "/")let classicQueues = try await client.listClassicQueues()
let classicVhostQueues = try await client.listClassicQueues(in: "/")
let quorumQueues = try await client.listQuorumQueues()
let quorumVhostQueues = try await client.listQuorumQueues(in: "/")
let streams = try await client.listStreams()
let vhostStreams = try await client.listStreams(in: "/")let queueInfo = try await client.getQueueInfo("my-queue", in: "/")let params = QueueParams.classicQueue("my-queue", in: "/")
try await client.declareQueue(params)Pass custom arguments:
var args: [String: JSONValue] = [:]
args["x-message-ttl"] = .int(3600000)
args["x-max-length"] = .int(10000)
let params = QueueParams.classicQueue("my-queue", in: "/", arguments: args)
try await client.declareQueue(params)Quorum queues are replicated, data safety-oriented queues based on the Raft consensus algorithm:
let params = QueueParams.quorumQueue("my-qq", in: "/")
try await client.declareQueue(params)With custom arguments:
var args: [String: JSONValue] = [:]
args["x-max-length"] = .int(10000)
args["x-single-active-consumer"] = .bool(true)
let params = QueueParams.quorumQueue("my-qq", in: "/", arguments: args)
try await client.declareQueue(params)Streams are persistent, replicated append-only logs with non-destructive consumer semantics:
let params = QueueParams.stream("my-stream", in: "/")
try await client.declareQueue(params)With retention settings:
var args: [String: JSONValue] = [:]
args["x-max-age"] = .string("7D")
args["x-max-length-bytes"] = .int(10_000_000_000)
let params = QueueParams.stream("my-stream", in: "/", arguments: args)
try await client.declareQueue(params)try await client.purgeQueue("my-queue", in: "/")try await client.deleteQueue("my-queue", in: "/", idempotently: false)try await client.deleteQueue("queue-1", in: "/")
try await client.deleteQueue("queue-2", in: "/")
try await client.deleteQueue("queue-3", in: "/")let exchanges = try await client.listExchanges()
let vhostExchanges = try await client.listExchanges(in: "/")let exchangeInfo = try await client.getExchangeInfo("my-exchange", in: "/")Use the type-safe factory methods:
let params = ExchangeParams.direct("my-exchange", in: "/")
try await client.declareExchange(params)
let params = ExchangeParams.topic("my-topic", in: "/")
try await client.declareExchange(params)
let params = ExchangeParams.fanout("my-fanout", in: "/")
try await client.declareExchange(params)
let params = ExchangeParams.headers("my-headers", in: "/")
try await client.declareExchange(params)With arguments:
var args: [String: JSONValue] = [:]
args["x-message-ttl"] = .int(3600000)
let params = ExchangeParams.topic("my-topic", in: "/", arguments: args)
try await client.declareExchange(params)try await client.deleteExchange("my-exchange", in: "/", idempotently: false)Bindings connect exchanges to queues or other exchanges.
let bindings = try await client.listBindings()
let vhostBindings = try await client.listBindings(in: "/")let queueBindings = try await client.listQueueBindings("my-queue", in: "/")let asSource = try await client.listExchangeBindingsAsSource("my-exchange", in: "/")
let asDestination = try await client.listExchangeBindingsAsDestination("my-exchange", in: "/")try await client.bindQueue(
"my-queue",
to: "my-exchange",
in: "/",
routingKey: "routing.key"
)With arguments:
try await client.bindQueue(
"my-queue",
to: "my-exchange",
in: "/",
routingKey: "routing.key",
arguments: ["x-match": .string("all")]
)try await client.bindExchange(
"destination-exchange",
to: "source-exchange",
in: "/",
routingKey: "routing.key"
)try await client.deleteQueueBinding(
"my-queue",
from: "my-exchange",
in: "/",
propertiesKey: "routing.key",
idempotently: false
)let channels = try await client.listChannels()
let vhostChannels = try await client.listChannels(in: "/")let channelInfo = try await client.getChannelInfo("channel-name")let connectionChannels = try await client.listChannels(on: "connection-name")let consumers = try await client.listConsumers()
let vhostConsumers = try await client.listConsumers(in: "/")let allPermissions = try await client.listPermissions()
let vhostPermissions = try await client.listPermissions(in: "/")
let userPermissions = try await client.listPermissions(of: "username")let permissions = try await client.getPermissions(of: "username", in: "/")let params = PermissionParams(
vhost: "/",
username: "my-user",
configure: ".*",
read: ".*",
write: ".*"
)
try await client.grantPermissions(params)try await client.clearPermissions(of: "username", in: "/", idempotently: false)Topic permissions control access to topics in exchanges.
let allTopicPerms = try await client.listTopicPermissions()
let vhostTopicPerms = try await client.listTopicPermissions(in: "/")
let userTopicPerms = try await client.listTopicPermissions(of: "username")let topicPerms = try await client.getTopicPermissions(of: "username", in: "/", exchange: "my-exchange")let params = TopicPermissionParams(
vhost: "/",
username: "my-user",
exchange: "my-exchange",
write: ".*",
read: ".*"
)
try await client.grantTopicPermissions(params)Clears all topic permissions for a user in a virtual host:
try await client.clearTopicPermissions(
of: "username",
in: "/",
idempotently: false
)Policies dynamically configure queue and exchange properties using pattern matching.
let policies = try await client.listPolicies()
let vhostPolicies = try await client.listPolicies(in: "/")let policy = try await client.getPolicy("my-policy", in: "/")var definition: [String: JSONValue] = [:]
definition["max-length"] = .int(10000)
definition["overflow"] = .string("reject-publish")
let params = PolicyParams(
vhost: "/",
name: "my-policy",
pattern: "^my-.*",
definition: definition,
priority: 0,
applyTo: .queues
)
try await client.declarePolicy(params)try await client.deletePolicy("my-policy", in: "/", idempotently: false)Operator policies are system-wide policies for operators to apply without input from users.
let opPolicies = try await client.listOperatorPolicies()
let vhostOpPolicies = try await client.listOperatorPolicies(in: "/")var definition: [String: JSONValue] = [:]
definition["delivery-limit"] = .int(5)
let params = PolicyParams(
vhost: "/",
name: "op-policy",
pattern: "^.*",
definition: definition,
priority: 100,
applyTo: .queues
)
try await client.declareOperatorPolicy(params)try await client.deleteOperatorPolicy("op-policy", in: "/", idempotently: false)Some list operations support pagination:
let page = PaginationParams(page: 1, pageSize: 100)
let queuesPage = try await client.listQueues(page: page)
let nextPage = PaginationParams(page: 2, pageSize: 100)
let moreQueues = try await client.listQueues(page: nextPage)Iterate all pages:
var allQueues: [QueueInfo] = []
var page = 1
while true {
let params = PaginationParams(page: page, pageSize: 100)
let result = try await client.listQueues(page: params)
allQueues.append(contentsOf: result.items)
if !result.hasMore {
break
}
page += 1
}Paginated operations include: listQueues, listConnections, and listExchanges.
try await client.healthCheckClusterAlarms()try await client.healthCheckLocalAlarms()try await client.healthCheckNodeIsQuorumCritical()try await client.healthCheckPortListener(5672)try await client.healthCheckProtocolListener(.amqp)
try await client.healthCheckProtocolListener(.stream)
try await client.healthCheckProtocolListener(.mqtt)try await client.healthCheckVirtualHosts()let flags = try await client.listFeatureFlags()try await client.enableFeatureFlag("feature-name")try await client.enableAllStableFeatureFlags()let json = try await client.exportDefinitions()Export vhost-specific definitions:
let vhostJson = try await client.exportDefinitions(of: "/")try await client.importDefinitions(jsonString)Import into a specific vhost:
try await client.importDefinitions(jsonString, into: "/")Message operations are for testing only and not recommended for production use.
try await client.publishMessage(
"test message",
to: "my-exchange",
routingKey: "test.key",
in: "/"
)let messages = try await client.getMessages(
from: "my-queue",
in: "/",
count: 10,
requeue: true
)let limits = try await client.listAllUserLimits()let userLimits = try await client.listUserLimits("username")try await client.setUserLimit("username", .maxConnections, value: 100)try await client.clearUserLimit("username", .maxConnections)let limits = try await client.listAllVirtualHostLimits()let vhostLimits = try await client.listVirtualHostLimits("/")try await client.setVirtualHostLimit("/", .maxQueues, value: 10000)try await client.clearVirtualHostLimit("/", .maxQueues)let allDeprecated = try await client.listDeprecatedFeatures()let inUse = try await client.listDeprecatedFeaturesInUse()try await client.rebalanceQueueLeaders()let plugins = try await client.listNodePlugins("rabbit@hostname")let clusterPlugins = try await client.listAllClusterPlugins()let allParams = try await client.listRuntimeParameters()List by component:
let federationParams = try await client.listRuntimeParameters(component: "federation")List for component in vhost:
let vhostFedParams = try await client.listRuntimeParameters(component: "federation", in: "/")let param = try await client.getRuntimeParameter(
"upstream",
of: "federation",
in: "/"
)var value: [String: JSONValue] = [:]
value["uri"] = .string("amqp://upstream-host")
value["ack-mode"] = .string("on-confirm")
let params = RuntimeParameterParams(
component: "federation",
name: "my-upstream",
vhost: "/",
value: value
)
try await client.upsertRuntimeParameter(params)try await client.deleteRuntimeParameter(
"my-upstream",
of: "federation",
in: "/",
idempotently: false
)let globals = try await client.listGlobalParameters()let param = try await client.getGlobalParameter("my-param")var value: [String: JSONValue] = [:]
value["key"] = .string("value")
let params = GlobalParameterParams(
name: "my-param",
value: value
)
try await client.upsertGlobalParameter(params)try await client.deleteGlobalParameter("my-param", idempotently: false)Federation links RabbitMQ brokers together to distribute messages.
let upstreams = try await client.listFederationUpstreams()
let vhostUpstreams = try await client.listFederationUpstreams(in: "/")let upstream = try await client.getFederationUpstream("my-upstream", in: "/")let params = FederationUpstreamParams(
name: "my-upstream",
vhost: "/",
definition: ["uri": .string("amqp://upstream-host")]
)
try await client.declareFederationUpstream(params)try await client.deleteFederationUpstream("my-upstream", in: "/", idempotently: false)let links = try await client.listFederationLinks()
let vhostLinks = try await client.listFederationLinks(in: "/")Shovels move messages from a source to a destination.
let shovels = try await client.listShovels()
let vhostShovels = try await client.listShovels(in: "/")let shovel = try await client.getShovel("my-shovel", in: "/")Type-safe factory methods for common shovel configurations:
let params = ShovelParams.amqp091QueueShovel(
name: "my-shovel",
vhost: "/",
sourceUri: "amqp://source-host",
destinationUri: "amqp://dest-host",
sourceQueue: "src-queue",
destinationQueue: "dst-queue"
)
try await client.declareShovel(params)Or for shoveling from exchange to queue:
let params = ShovelParams.amqp091ExchangeShovel(
name: "my-shovel",
vhost: "/",
sourceUri: "amqp://source-host",
sourceExchange: "src-exchange",
destinationUri: "amqp://dest-host",
destinationQueue: "dst-queue"
)
try await client.declareShovel(params)try await client.deleteShovel("my-shovel", in: "/", idempotently: false)All cluster publishers:
let allPublishers = try await client.listStreamPublishers()By virtual host:
let vhostPublishers = try await client.listStreamPublishers(in: "/")By stream and virtual host:
let streamPublishers = try await client.listStreamPublishers(of: "my-stream", in: "/")By connection and virtual host:
let connectionPublishers = try await client.listStreamPublishers(on: "connection-name", in: "/")All cluster consumers:
let allConsumers = try await client.listStreamConsumers()By virtual host:
let vhostConsumers = try await client.listStreamConsumers(in: "/")By connection and virtual host:
let connectionConsumers = try await client.listStreamConsumers(on: "connection-name", in: "/")let connInfo = try await client.getStreamConnectionInfo("connection-name")let oauthConfig = try await client.oauthConfiguration()let stats = try await client.authAttemptStatistics(user: "username", in: "/")These operations are specific to Tanzu RabbitMQ deployments.
try await client.enableSchemaDefinitionSync()
try await client.enableSchemaDefinitionSync(on: "rabbit@hostname")try await client.disableSchemaDefinitionSync()
try await client.disableSchemaDefinitionSync(on: "rabbit@hostname")let status = try await client.schemaDefinitionSyncStatus()
let nodeStatus = try await client.schemaDefinitionSyncStatus(on: "rabbit@hostname")let replStatus = try await client.warmStandbyReplicationStatus()The client throws errors from the ClientError enum which includes cases like badRequest, unauthorized, notFound, conflict, and serverError:
do {
try await client.getQueueInfo("my-queue", in: "/")
} catch let error as ClientError {
switch error {
case .notFound:
print("Queue not found")
case .unauthorized:
print("Authentication failed")
case .conflict:
print("Resource already exists")
case .badRequest(let reason):
print("Bad request: \(reason)")
case .serverError(let statusCode, let body):
print("Server error \(statusCode): \(body)")
default:
print("Client error: \(error)")
}
} catch {
print("Other error: \(error)")
}Deletion operations accept an idempotently parameter. When true, the client returns success even if the resource doesn't exist (404):
try await client.deleteQueue("my-queue", in: "/", idempotently: true)
try await client.deleteUser("my-user", idempotently: true)The JSONValue enum provides type-safe construction of JSON values for arguments and definitions:
var args: [String: JSONValue] = [:]
args["x-max-length"] = .int(10000)
args["x-overflow"] = .string("reject-publish")
args["x-single-active-consumer"] = .bool(true)
args["x-dead-letter-strategy"] = .object(["type": .string("at-most-once")])
let params = QueueParams.quorumQueue("my-queue", in: "/", arguments: args)
try await client.declareQueue(params)The client delegates TLS and certificate handling to URLSession, following Apple's design patterns. Peer certificate chain verification using system root CA certificates is enabled by default for HTTPS endpoints.
// Automatically verifies the server's certificate chain and rejects invalid/self-signed certs
let client = Client(
endpoint: "https://rabbitmq.example.com:15671/api",
username: "guest",
password: "guest"
)Restrict connections to specific certificates for additional security:
import Foundation
class PinningDelegate: NSObject, URLSessionDelegate {
let pinnedCertificates: [SecCertificate]
init(certificates: [Data]) {
self.pinnedCertificates = certificates.compactMap { data in
SecCertificateCreateWithData(nil, data as CFData)
}
}
func urlSession(
_ session: URLSession,
didReceive challenge: URLAuthenticationChallenge,
completionHandler: @escaping (URLSession.AuthChallengeDisposition, URLCredential?) -> Void
) {
guard challenge.protectionSpace.authenticationMethod == NSURLAuthenticationMethodServerTrust,
let serverTrust = challenge.protectionSpace.serverTrust else {
completionHandler(.cancelAuthenticationChallenge, nil)
return
}
// Validate the server's trust chain against the system root CAs first
var secResult = SecTrustResultType.invalid
let status = SecTrustEvaluate(serverTrust, &secResult)
guard status == errSecSuccess else {
completionHandler(.cancelAuthenticationChallenge, nil)
return
}
// Then verify that at least one certificate in the chain matches a pinned certificate
for i in 0..<SecTrustGetCertificateCount(serverTrust) {
if let cert = SecTrustGetCertificateAtIndex(serverTrust, i) {
for pinnedCert in pinnedCertificates {
if cert == pinnedCert {
completionHandler(.useCredential, URLCredential(trust: serverTrust))
return
}
}
}
}
// If no pinned certificate matched, reject the connection
completionHandler(.cancelAuthenticationChallenge, nil)
}
}
let certData = try Data(contentsOf: URL(fileURLWithPath: "/path/to/server.cer"))
let config = URLSessionConfiguration.default
let session = URLSession(configuration: config, delegate: PinningDelegate(certificates: [certData]), delegateQueue: nil)
let client = Client(
endpoint: "https://rabbitmq.example.com:15671/api",
username: "guest",
password: "guest",
session: session
)Use client certificates for mTLS (mutual peer certificate chain verification):
class ClientCertificateDelegate: NSObject, URLSessionDelegate {
let identity: SecIdentity
let certificate: SecCertificate
init(pkcs12Data: Data, password: String) throws {
// Extract client identity and certificate from a PKCS#12 file
var importResult: CFArray?
let status = SecPKCS12Import(
pkcs12Data as CFData,
[kSecImportExportPassphrase as String: password] as CFDictionary,
&importResult
)
guard status == errSecSuccess,
let result = importResult as? [[String: Any]],
let firstItem = result.first,
let identity = firstItem[kSecImportItemIdentity as String] as? SecIdentity,
let cert = firstItem[kSecImportItemCertificate as String] as? SecCertificate
else {
throw NSError(domain: "ClientCert", code: Int(status))
}
self.identity = identity
self.certificate = cert
}
func urlSession(
_ session: URLSession,
didReceive challenge: URLAuthenticationChallenge,
completionHandler: @escaping (URLSession.AuthChallengeDisposition, URLCredential?) -> Void
) {
// Respond only to client certificate requests; delegate all other challenges
if challenge.protectionSpace.authenticationMethod == NSURLAuthenticationMethodClientCertificate {
let credential = URLCredential(
identity: identity,
certificates: [certificate],
persistence: .forSession
)
completionHandler(.useCredential, credential)
} else {
completionHandler(.performDefaultHandling, nil)
}
}
}
let pkcs12Data = try Data(contentsOf: URL(fileURLWithPath: "/path/to/client.p12"))
let delegate = try ClientCertificateDelegate(pkcs12Data: pkcs12Data, password: "password")
let config = URLSessionConfiguration.default
let session = URLSession(configuration: config, delegate: delegate, delegateQueue: nil)
let client = Client(
endpoint: "https://rabbitmq.example.com:15671/api",
username: "guest",
password: "guest",
session: session
)tls-gen to generate a self-signed CA and certificate chains
for local development.
class SelfSignedDelegate: NSObject, URLSessionDelegate {
func urlSession(
_ session: URLSession,
didReceive challenge: URLAuthenticationChallenge,
completionHandler: @escaping (URLSession.AuthChallengeDisposition, URLCredential?) -> Void
) {
// IMPORTANT: This effectively disables peer certificate chain verification by accepting any and every server certificate
// chain. Use this configuration in development and testing environments, NOT in production.
if challenge.protectionSpace.authenticationMethod == NSURLAuthenticationMethodServerTrust,
let serverTrust = challenge.protectionSpace.serverTrust {
completionHandler(.useCredential, URLCredential(trust: serverTrust))
} else {
completionHandler(.performDefaultHandling, nil)
}
}
}
let session = URLSession(
configuration: .default,
delegate: SelfSignedDelegate(),
delegateQueue: nil
)
let client = Client(
endpoint: "https://localhost:15671/api",
username: "guest",
password: "guest",
session: session
)Copyright (C) 2025-2026 Michael S. Klishin and Contributors
Licensed under the Apache License, Version 2.0. See LICENSE for details.