BunnySwift is a RabbitMQ client for Swift that primarily follows the API design of Bunny but also borrows from the .NET, Java, Python (Pika) and Rust (amqprs) clients. It is built on Swift NIO and modern Swift concurrency features such as async/await.
If you are looking for a RabbitMQ HTTP API for Swift 6, see michaelklishin/rabbitmq-http-api-client-swift.
This library targets macOS 14+, iOS 17+, tvOS 17+, watchOS 10+, and visionOS 1+. Swift 6.0 or later is required.
Bunny.Swift targets currently supported RabbitMQ release series.
This is a very new project by a long time member of the RabbitMQ Core Team. Breaking public API changes are not out of the question at this stage.
Add BunnySwift to your Package.swift:
dependencies: [
.package(url: "https://github.com/michaelklishin/bunny-swift.git", from: "0.9.0")
]import BunnySwift
// Connect to RabbitMQ
let connection = try await Connection.open()
let channel = try await connection.openChannel()
// Declare a queue and publish a message
let queue = try await channel.queue("hello")
try await queue.publish("Hello, World!")
// Close the channel and connection. Note: this is just an example,
// real world applications should use long-lived connections as much as possible.
try await channel.close()
try await connection.close()// Classic queue (default)
let classic = try await channel.queue("events.classic")
// Quorum queue (replicated, durable)
let quorum = try await channel.queue(
"events.quorum",
type: .quorum,
durable: true
)
// Stream (append-only log)
let stream = try await channel.queue(
"events.stream",
type: .stream,
durable: true
)
// Custom or plugin-provided queue type (forward compatibility)
let custom = try await channel.queue(
"events.custom",
type: .custom("x-my-queue-type"),
durable: true
)Tanzu RabbitMQ provides additional queue types.
// Delayed queue with retry on failed deliveries
let delayed = try await channel.delayedQueue(
"delayed.with.retries",
retryType: .failed,
retryMin: .seconds(1),
retryMax: .seconds(60)
)
// JMS queue with selector support
let jms = try await channel.jmsQueue(
"orders",
selectorFields: ["priority", "region"],
selectorFieldMaxBytes: 256
)
// Consume with a JMS selector expression
let stream = try await jms.consume(
jmsSelector: "priority > 5 AND region = 'EU'"
)// Enable publisher confirms with automatic tracking (similar to the .NET client 7.x)
// Publish methods will wait for pending confirmations to arrive before returning.
try await channel.confirmSelect(tracking: true, outstandingLimit: 128)
// Each publish now waits for confirmation (blocks until confirmed)
try await queue.publish("A message")
// Without automatic tracking (manual mode), use waitForConfirms
// to wait until all pending confirms are received.
try await channel.confirmSelect()
try await queue.publish("Message 1")
try await queue.publish("Message 2")
// Wait for all outstanding confirms
try await channel.waitForConfirms()// Limit unacknowledged messages per channel to 400
try await channel.basicQos(prefetchCount: 400)
// Or apply globally to all consumers on the connection
try await channel.basicQos(prefetchCount: 400, global: true)let exchange = try await channel.topic("events", durable: true)
let queue = try await channel.queue("events.important", durable: true)
// Bind with a routing key pattern
try await queue.bind(to: exchange, routingKey: "events.#")
// Or bind by exchange name
try await queue.bind(to: "events", routingKey: "events.critical.*")
// Headers exchange: route by message headers instead of routing key
let hx = try await channel.headers("dispatch", durable: true)
let urgentQueue = try await channel.queue("dispatch.urgent", durable: true)
try await urgentQueue.bind(to: hx, arguments: [
XArguments.headersMatch: HeadersMatch.all.asFieldValue,
"priority": .string("high"),
"region": .string("EU"),
])// Start consuming with manual acknowledgement mode (default)
let stream = try await queue.consume(acknowledgementMode: .manual)
for try await message in stream {
// Access delivery metadata
let delivery = message.deliveryInfo
print("Consumer tag: \(delivery.consumerTag)")
print("Delivery tag: \(delivery.deliveryTag)")
print("Exchange: \(delivery.exchange)")
print("Routing key: \(delivery.routingKey)")
print("Redelivered: \(delivery.redelivered)")
// Access message properties
let props = message.properties
if let contentType = props.contentType {
print("Content-Type: \(contentType)")
}
if let messageId = props.messageId {
print("Message ID: \(messageId)")
}
if let correlationId = props.correlationId {
print("Correlation ID: \(correlationId)")
}
if let timestamp = props.timestamp {
print("Timestamp: \(timestamp)")
}
if let headers = props.headers {
print("Headers: \(headers)")
}
// Access message body
print("Body: \(message.bodyString ?? "")")
// Process the message, then acknowledge
try await message.ack()
// Or reject/requeue on failure
// try await message.nack(requeue: true)
// Or reject without requeuing
// try await message.reject(requeue: false)
}Automatic connection recovery, inspired by Ruby Bunny and the Java client, is enabled by default, including topology recovery.
When a connection is lost due to a network failure, heartbeat timeout, or server-initiated close, the library will automatically reconnect and recover the topology (exchanges, queues, streams, bindings, consumers).
The recovery procedure is standard for multiple RabbitMQ client libraries.
Recovery behavior can be customised:
let config = ConnectionConfiguration(
// Initial delay before first recovery attempt (default: 5 s)
networkRecoveryInterval: 5.0,
// nil for unlimited attempts (default)
maxRecoveryAttempts: nil,
// Exponential backoff multiplier (default: 2.0)
recoveryBackoffMultiplier: 2.0,
// Maximum delay between attempts (default: 60 s)
maxRecoveryInterval: 60.0
)
let connection = try await Connection.open(config)To be notified after a successful recovery:
connection.onRecovery {
print("Connection recovered")
}By default, all exchanges, queues, bindings, and consumers declared through the connection are redeclared after reconnecting.
To selectively skip certain entities, use a TopologyRecoveryFilter:
connection.setTopologyRecoveryFilter(TopologyRecoveryFilter(
queueFilter: { queue in !queue.autoDelete },
exchangeFilter: { exchange in exchange.durable }
))To disable automatic recovery:
let config = ConnectionConfiguration(automaticRecovery: false)// Unbind using the exchange object
try await queue.unbind(from: exchange, routingKey: "events.#")
// Or by exchange name
try await queue.unbind(from: "events", routingKey: "events.critical.*")// Declare a source and destination exchange
let source = try await channel.topic("events.all", durable: true)
let destination = try await channel.fanout("events.important", durable: true)
// Bind destination exchange to the source (messages flow from source to destination)
try await destination.bind(to: source, routingKey: "events.critical.#")
// Unbind when no longer needed
try await destination.unbind(from: source, routingKey: "events.critical.#")// Delete a queue, returns the number of messages that were in it
let deletedMessageCount = try await queue.delete()
// Only delete if no consumers are active
let count = try await queue.delete(ifUnused: true)
// Only delete if the queue is empty
let count = try await queue.delete(ifEmpty: true)
// Or use the channel directly
let count = try await channel.queueDelete("my.queue", ifUnused: true, ifEmpty: true)// Delete an exchange
try await exchange.delete()
// Only delete if no queues are bound to it
try await exchange.delete(ifUnused: true)
// Or use the channel directly
try await channel.exchangeDelete("my.exchange", ifUnused: true)BunnySwift supports TLS connections. The following example uses tls-gen-generated certificates in the PEM format:
// Mutual TLS with client certificate authentication
let tls = try TLSConfiguration.fromPEMFiles(
certificatePath: "/path/to/tls-gen/basic/result/client_certificate.pem",
keyPath: "/path/to/tls-gen/basic/result/client_key.pem",
caCertificatePath: "/path/to/tls-gen/basic/result/ca_certificate.pem"
)
let config = ConnectionConfiguration(
host: "rabbit.example.com",
// Default TLS port
port: 5671,
tls: tls
)
let connection = try await Connection.open(config)In the case of one-way peer verification (client verifies RabbitMQ certificate chain but does not have its own certificate/key pair):
// TLS with server certificate verification only
let tls = try TLSConfiguration.withCACertificate(
path: "/path/to/tls-gen/basic/result/ca_certificate.pem"
)
let config = ConnectionConfiguration(
host: "rabbit.example.com",
port: 5671,
tls: tls
)
let connection = try await Connection.open(config)Using an AMQPS URI:
// URI-based configuration automatically enables TLS
var config = try ConnectionConfiguration.from(uri: "amqps://rabbitmq.eng.example.com")
// Add custom TLS settings for certificate verification
config.tls = try TLSConfiguration.withCACertificate(
path: "/path/to/ca_certificate.pem"
)
let connection = try await Connection.open(config)The following example enables mutual peer verification and combines it with a URI:
var config = try ConnectionConfiguration.from(uri: "amqps://rabbit.example.com")
config.tls = try TLSConfiguration.fromPEMFiles(
certificatePath: "/path/to/client_certificate.pem",
keyPath: "/path/to/client_key.pem",
caCertificatePath: "/path/to/ca_certificate.pem"
)
let connection = try await Connection.open(config)Advanced TLS configuration:
import NIOSSL
// Fine-grained TLS configuration
var tls = TLSConfiguration(
certificateChain: [],
privateKey: nil,
// Use system trust store
trustRoots: .default,
certificateVerification: .fullVerification,
// Minimum TLS 1.2
minimumTLSVersion: .tlsv12,
// Maximum TLS 1.3
maximumTLSVersion: .tlsv13
)
// Load certificates programmatically
let caCerts = try NIOSSLCertificate.fromPEMFile("/path/to/ca.pem")
tls.trustRoots = .certificates(caCerts)
let config = ConnectionConfiguration(
host: "rabbit.example.com",
port: 5671,
tls: tls
)
let connection = try await Connection.open(config)For development and testing only (not recommended for production):
// Skip peer verification: THIS IS A POOR SECURITY PRACTICE, use only for development and testing
let config = ConnectionConfiguration(
host: "localhost",
port: 5671,
tls: TLSConfiguration.insecure()
)
let connection = try await Connection.open(config)- Connections
- Channels
- Queues
- Quorum Queues
- Streams
- Publishers
- Consumers
- Publisher and Consumer Confirmations
- TLS guide
Please use GitHub Discussions unless you have an executable, repeatable way to reproduce the reported behavior.
This library is dual-licensed under the Apache Software License 2.0 and the MIT license.
SPDX-License-Identifier: Apache-2.0 OR MIT
Copyright (c) 2025-2026 Michael S. Klishin