Java
Last updated
Last updated
The KubeMQ SDK for Java enables Java developers to seamlessly communicate with the KubeMQ server, implementing various communication patterns such as Events, EventStore, Commands, Queries, and Queues.
Java Development Kit (JDK) 8 or higher
Maven
KubeMQ server running locally or accessible over the network
The recommended way to use the SDK for Java in your project is to add it as a dependency in Maven:
To build with Gradle, add the dependency to your build.gradle
file:
The examples are standalone projects that showcase the usage of the SDK. To run the examples, ensure you have a running instance of KubeMQ. Import the project into any IDE of your choice (e.g., IntelliJ, Eclipse, NetBeans). The example project contains three packages demonstrating different implementations:
io.kubemq.example.cq
: Examples related to Commands and Queries
io.kubemq.example.pubsub
: Examples related to Events and EventStore
io.kubemq.example.queues
: Examples related to Queues
Once you check out the code from GitHub, you can build it using Maven:
This command will run the tests and install the JAR file to your local Maven repository. To skip the tests, use the following command:
The SDK implements all communication patterns available through the KubeMQ server:
PubSub
Events
EventStore
Commands & Queries (CQ)
Commands
Queries
Queues
All KubeMQ clients (PubSubClient, QueuesClient, and CQClient) share the same configuration parameters. To create any client instance, you need to use the respective builder with at least two mandatory parameters: address
(KubeMQ server address) and clientId
.
The table below describes all available configuration parameters:
address
String
The address of the KubeMQ server.
None
Yes
clientId
String
The client ID used for authentication.
None
Yes
authToken
String
The authorization token for secure communication.
None
No
tls
boolean
Indicates if TLS (Transport Layer Security) is enabled.
false
No
tlsCertFile
String
The path to the TLS certificate file.
None
No
tlsKeyFile
String
The path to the TLS key file.
None
No
caCertFile
String
The path to the CA certificate file.
None
No
maxReceiveize
int
The maximum size of the messages to receive (in bytes).
104857600 (100MB)
No
reconnectIntervalSeconds
int
The interval in seconds between reconnection attempts.
1
No
keepAlive
boolean
Indicates if the connection should be kept alive.
false
No
pingIntervalInSeconds
int
The interval in seconds between ping messages.
60
No
pingTimeoutInSeconds
int
The timeout in seconds for ping messages.
30
No
logLevel
Level
The logging level to use.
Level.INFO
No
Here's an example of how to create a client instance (using PubSubClient as an example):
Replace PubSubClient
with QueuesClient
or CQClient
to create instances of other client types. The configuration parameters remain the same for all client types.
For secure connections, set tls
to true
and provide the paths to your TLS certificate and key files.
Adjust maxReceiveize
based on your expected message sizes to optimize performance.
Fine-tune reconnectIntervalSeconds
, keepAlive
, pingIntervalInSeconds
, and pingTimeoutInSeconds
based on your network conditions and requirements.
Choose an appropriate logLevel
for your development or production environment.
Remember to handle any exceptions that might be thrown during client creation, such as connection errors or invalid configuration parameters.
All KubeMQ clients (PubSubClient, QueuesClient, and CQClient) provide an optional ping()
method to verify connectivity with the KubeMQ server. This method is not required for normal operations and should be used sparingly.
ServerInfo
The ping()
method returns a ServerInfo
object with the following attributes:
host
String
The host address of the KubeMQ server
version
String
The version of the KubeMQ server
serverStartTime
long
The start time of the server (in seconds since epoch)
serverUpTimeSeconds
long
The uptime of the server in seconds
The ping operation is optional and should be used judiciously. Here are some appropriate scenarios for using ping:
Initial Connection Verification: You may use ping once after creating the client to verify the initial connection.
Troubleshooting: If you suspect connectivity issues, ping can help diagnose if the problem is with the connection to the KubeMQ server.
Long Periods of Inactivity: In applications with long periods of inactivity, you might use ping to check if the connection is still alive before performing an operation.
Not Required for Regular Operations: The ping operation is not needed for regular message sending or receiving operations. The client handles connection management internally.
Performance Consideration: Excessive use of ping can introduce unnecessary network traffic and potential performance overhead.
Not a Guarantee: A successful ping doesn't guarantee that all server functionalities are working correctly. It only verifies basic connectivity.
Error Handling: Always handle potential IOException when using ping, as network issues can occur.
Remember, the KubeMQ client is designed to handle connection management efficiently. In most cases, you can rely on the client to maintain the connection without explicit ping operations.
Create a new Events channel.
channelName
String
Name of the channel you want to create
None
Yes
isChannelCreated
boolean
Indicates if channel was created
Delete an existing Events channel.
channelName
String
Name of the channel you want to delete
None
Yes
isChannelDeleted
boolean
Indicates if channel was deleted
Retrieve a list of Events channels.
searchQuery
String
Search query to filter channels (optional)
None
No
Returns a List<PubSubChannel>
where each PubSubChannel
has the following attributes:
name
String
The name of the Pub/Sub channel.
type
String
The type of the Pub/Sub channel.
lastActivity
long
The timestamp of the last activity on the channel, represented in milliseconds since epoch.
isActive
boolean
Indicates whether the channel is active or not.
incoming
PubSubStats
The statistics related to incoming messages for this channel.
outgoing
PubSubStats
The statistics related to outgoing messages for this channel.
Send a message to an Events channel.
EventMessage
Class Attributesid
String
Unique identifier for the event message.
None
No
channel
String
The channel to which the event message is sent.
None
Yes
metadata
String
Metadata associated with the event message.
None
No
body
byte[]
Body of the event message in bytes.
Empty byte array
No
tags
Map<String, String>
Tags associated with the event message as key-value pairs.
Empty Map
No
This method doesn't return a value. Successful execution implies the message was sent.
Subscribes to receive messages from an Events channel.
EventsSubscription
Class Attributeschannel
String
The channel to subscribe to.
None
Yes
group
String
The group to subscribe with.
None
No
onReceiveEventCallback
Consumer
Callback function to be called when an event message is received.
None
Yes
onErrorCallback
Consumer
Callback function to be called when an error occurs.
None
No
This method doesn't return a value. It sets up a subscription that will invoke the provided callbacks.
EventMessageReceived
Class Detailid
String
The unique identifier of the message.
fromClientId
String
The ID of the client that sent the message.
timestamp
long
The timestamp when the message was received, in seconds.
channel
String
The channel to which the message belongs.
metadata
String
The metadata associated with the message.
body
byte[]
The body of the message.
sequence
long
The sequence number of the message.
tags
Map<String, String>
The tags associated with the message.
Note: Remember to handle the subscription lifecycle appropriately in your application. You may want to store the subscription object to cancel it when it's no longer needed.
Create a new EventsStore channel.
channelName
String
Name of the channel you want to create
None
Yes
isChannelCreated
boolean
Indicates if channel was created
Delete an existing EventsStore channel.
channelName
String
Name of the channel you want to delete
None
Yes
isChannelDeleted
boolean
Indicates if channel was deleted
Retrieve a list of EventsStore channels.
searchQuery
String
Search query to filter channels (optional)
None
No
Returns a List<PubSubChannel>
where each PubSubChannel
has the following attributes:
name
String
The name of the Pub/Sub channel.
type
String
The type of the Pub/Sub channel.
lastActivity
long
The timestamp of the last activity on the channel, represented in milliseconds since epoch.
isActive
boolean
Indicates whether the channel is active or not.
incoming
PubSubStats
The statistics related to incoming messages for this channel.
outgoing
PubSubStats
The statistics related to outgoing messages for this channel.
Send a message to an EventsStore channel.
EventStoreMessage
Class Attributesid
String
Unique identifier for the event message.
None
No
channel
String
The channel to which the event message is sent.
None
Yes
metadata
String
Metadata associated with the event message.
None
No
body
byte[]
Body of the event message in bytes.
Empty byte array
No
tags
Map<String, String>
Tags associated with the event message as key-value pairs.
Empty Map
No
Note: At least one of metadata
, body
, or tags
is required.
Returns an EventSendResult
object (details not provided in the original content).
Subscribes to receive messages from an EventsStore channel.
EventsStoreSubscription
Class Attributeschannel
String
The channel to subscribe to.
None
Yes
group
String
The group to subscribe with.
None
No
onReceiveEventCallback
Consumer
Callback function to be called when an event message is received.
None
Yes
onErrorCallback
Consumer
Callback function to be called when an error occurs.
None
No
eventsStoreType
EventsStoreType
Type of EventsStore subscription (e.g., StartAtTime, StartAtSequence)
None
Yes
eventsStoreStartTime
Instant
Start time for EventsStore subscription (if applicable)
None
Conditional
Undefined
0
Default value, should be explicitly set to a valid type before use
StartNewOnly
1
Start storing events from the point when the subscription is made
StartFromFirst
2
Start storing events from the first event available
StartFromLast
3
Start storing events from the last event available
StartAtSequence
4
Start storing events from a specific sequence number
StartAtTime
5
Start storing events from a specific point in time
StartAtTimeDelta
6
Start storing events from a specific time delta in seconds
This method doesn't return a value. It sets up a subscription that will invoke the provided callbacks.
EventStoreMessageReceived
Class Detailid
String
The unique identifier of the message.
fromClientId
String
The ID of the client that sent the message.
timestamp
long
The timestamp when the message was received, in seconds.
channel
String
The channel to which the message belongs.
metadata
String
The metadata associated with the message.
body
byte[]
The body of the message.
sequence
long
The sequence number of the message.
tags
Map<String, String>
The tags associated with the message.
Note: Remember to handle the subscription lifecycle appropriately in your application. You may want to store the subscription object to cancel it when it's no longer needed.
Create a new Command channel.
channelName
String
Name of the channel you want to create
None
Yes
isChannelCreated
boolean
Indicates if channel was created
Delete an existing Command channel.
channelName
String
Name of the channel you want to delete
None
Yes
isChannelDeleted
boolean
Indicates if channel was deleted
Retrieve a list of Command channels.
searchString
String
Search query to filter channels (optional)
None
No
Returns a List<CQChannel>
where each CQChannel
has the following attributes:
name
String
The name of the channel.
type
String
The type of the channel.
lastActivity
long
The timestamp of the last activity on the channel.
isActive
boolean
Indicates whether the channel is currently active.
incoming
CQStats
Statistics about incoming messages to the channel.
outgoing
CQStats
Statistics about outgoing messages from the channel.
Send a command request to a Command channel.
CommandMessage
Class Attributesid
String
The ID of the command message.
None
Yes
channel
String
The channel through which the command message will be sent.
None
Yes
metadata
String
Additional metadata associated with the command message.
None
No
body
byte[]
The body of the command message as bytes.
Empty byte array
No
tags
Map<String, String>
A dictionary of key-value pairs representing tags associated with the command message.
Empty Map
No
timeoutInSeconds
int
The maximum time in seconds for waiting to response.
None
Yes
CommandResponseMessage
Class AttributescommandReceived
CommandMessageReceived
The command message received in the response.
clientId
String
The client ID associated with the command response.
requestId
String
The unique request ID of the command response.
isExecuted
boolean
Indicates if the command has been executed.
timestamp
LocalDateTime
The timestamp when the command response was created.
error
String
The error message if there was an error.
Subscribes to receive command messages from a Command channel.
CommandsSubscription
Class Attributeschannel
String
The channel for the subscription.
None
Yes
group
String
The group associated with the subscription.
None
No
onReceiveCommandCallback
Consumer
Callback function for receiving commands.
None
Yes
onErrorCallback
Consumer
Callback function for error handling.
None
No
This method doesn't return a value. It sets up a subscription that will invoke the provided callbacks.
CommandMessageReceived
Class Attributesid
String
The unique identifier of the command message.
fromClientId
String
The ID of the client who sent the command message.
timestamp
Instant
The timestamp when the command message was received.
channel
String
The channel through which the command message was sent.
metadata
String
Additional metadata associated with the command message.
body
byte[]
The body of the command message as bytes.
replyChannel
String
The channel to which the reply should be sent.
tags
Map<String, String>
A dictionary of key-value pairs representing tags associated with the command message.
CommandResponseMessage
Class AttributesWhen responding to a received command, you should construct a CommandResponseMessage
with the following attributes:
commandReceived
CommandMessageReceived
The command message received in the response.
clientId
String
The client ID associated with the command response.
requestId
String
The unique request ID of the command response.
isExecuted
boolean
Indicates if the command has been executed.
timestamp
LocalDateTime
The timestamp when the command response was created.
error
String
The error message if there was an error.
Note: Remember to handle the subscription lifecycle appropriately in your application. You may want to store the subscription object to cancel it when it's no longer needed. Also, ensure that you properly construct and send a CommandResponseMessage
for each received command to complete the request-response cycle.
Create a new Query channel.
channelName
String
Name of the channel you want to create
None
Yes
isChannelCreated
boolean
Indicates if channel was created
Delete an existing Query channel.
channelName
String
Name of the channel you want to delete
None
Yes
isChannelDeleted
boolean
Indicates if channel was deleted
Retrieve a list of Query channels.
searchString
String
Search query to filter channels (optional)
None
No
Returns a List<CQChannel>
where each CQChannel
has the following attributes:
name
String
The name of the channel.
type
String
The type of the channel.
lastActivity
long
The timestamp of the last activity on the channel.
isActive
boolean
Indicates whether the channel is currently active.
incoming
CQStats
Statistics about incoming messages to the channel.
outgoing
CQStats
Statistics about outgoing messages from the channel.
Send a query request to a Query channel.
QueryMessage
Class Attributesid
String
The ID of the query message.
None
Yes
channel
String
The channel through which the query message will be sent.
None
Yes
metadata
String
Additional metadata associated with the query message.
None
No
body
byte[]
The body of the query message as bytes.
Empty byte array
No
tags
Map<String, String>
A dictionary of key-value pairs representing tags associated with the query message.
Empty Map
No
timeoutInSeconds
int
The maximum time in seconds for waiting response.
None
Yes
QueryResponseMessage
Class AttributesqueryReceived
QueryMessageReceived
The query message received in the response.
clientId
String
The client ID associated with the query response.
requestId
String
The unique request ID of the query response.
executed
boolean
Indicates if the query has been executed.
timestamp
LocalDateTime
The timestamp when the query response was created.
metadata
String
Additional metadata associated with the response.
body
byte[]
The body of the query response as bytes.
error
String
The error message if there was an error.
Subscribes to receive query messages from a Query channel.
QueriesSubscription
Class Attributeschannel
String
The channel for the subscription.
None
Yes
group
String
The group associated with the subscription.
None
No
onReceiveQueryCallback
Consumer
Callback function for receiving queries.
None
Yes
onErrorCallback
Consumer
Callback function for error handling.
None
No
This method doesn't return a value. It sets up a subscription that will invoke the provided callbacks.
QueryMessageReceived
Class Attributesid
String
The unique identifier of the query message.
fromClientId
String
The ID of the client who sent the query message.
timestamp
Instant
The timestamp when the query message was received.
channel
String
The channel through which the query message was sent.
metadata
String
Additional metadata associated with the query message.
body
byte[]
The body of the query message as bytes.
replyChannel
String
The channel to which the reply should be sent.
tags
Map<String, String>
A dictionary of key-value pairs representing tags associated with the query message.
QueryResponseMessage
Class AttributesWhen responding to a received query, you should construct a QueryResponseMessage
with the following attributes:
queryReceived
QueryMessageReceived
The query message received in the response.
clientId
String
The client ID associated with the query response.
requestId
String
The unique request ID of the query response.
executed
boolean
Indicates if the query has been executed.
timestamp
LocalDateTime
The timestamp when the query response was created.
metadata
String
Additional metadata associated with the response.
body
byte[]
The body of the query response as bytes.
error
String
The error message if there was an error.
Note: Remember to handle the subscription lifecycle appropriately in your application. You may want to store the subscription object to cancel it when it's no longer needed. Also, ensure that you properly construct and send a QueryResponseMessage
for each received query to complete the request-response cycle.
Create a new Queue channel.
channelName
String
Name of the channel you want to create
None
Yes
isChannelCreated
boolean
Indicates if channel was created
Delete an existing Queue channel.
channelName
String
Name of the channel you want to delete
None
Yes
isChannelDeleted
boolean
Indicates if channel was deleted
Retrieve a list of Queue channels.
searchString
String
Search query to filter channels (optional)
None
No
Returns a List<QueuesChannel>
where each QueuesChannel
has the following attributes:
name
String
The name of the queue channel.
type
String
The type of the queue channel.
lastActivity
long
The timestamp of the last activity in the queue channel.
isActive
boolean
Indicates whether the queue channel is currently active.
incoming
QueuesStats
The statistics for incoming messages in the queue channel.
outgoing
QueuesStats
The statistics for outgoing messages in the queue channel.
Send a message to a Queue channel.
QueueMessage
Class Attributesid
String
The unique identifier for the message.
None
No
channel
String
The channel of the message.
None
Yes
metadata
String
The metadata associated with the message.
None
No
body
byte[]
The body of the message.
new byte[0]
No
tags
Map<String, String>
The tags associated with the message.
new HashMap<>()
No
delayInSeconds
int
The delay in seconds before the message becomes available in the queue.
None
No
expirationInSeconds
int
The expiration time in seconds for the message.
None
No
attemptsBeforeDeadLetterQueue
int
The number of receive attempts allowed for the message before it is moved to the dead letter queue.
None
No
deadLetterQueue
String
The dead letter queue where the message will be moved after reaching the maximum receive attempts.
None
No
QueueSendResult
Class Attributesid
String
The unique identifier of the message.
sentAt
LocalDateTime
The timestamp when the message was sent.
expiredAt
LocalDateTime
The timestamp when the message will expire.
delayedTo
LocalDateTime
The timestamp when the message will be delivered.
isError
boolean
Indicates if there was an error while sending the message.
error
String
The error message if isError
is true.
This method allows you to send a message to a specified Queue channel. You can customize various aspects of the message, such as its content, metadata, tags, delay, expiration, and dead letter queue settings. The response provides information about the sent message, including its ID, timestamps, and any potential errors.
Receive messages from a Queue channel.
QueuesPollRequest
Class Attributeschannel
String
The channel to poll messages from.
None
Yes
pollMaxMessages
int
The maximum number of messages to poll.
1
No
pollWaitTimeoutInSeconds
int
The wait timeout in seconds for polling messages.
60
No
autoAckMessages
boolean
Indicates if messages should be auto-acknowledged.
false
No
visibilitySeconds
int
Add a visibility timeout feature for messages.
0
No
QueuesPollResponse
Class AttributesrefRequestId
String
The reference ID of the request.
transactionId
String
The unique identifier for the transaction.
messages
List
The list of received queue messages.
error
String
The error message, if any error occurred.
isError
boolean
Indicates if there was an error.
isTransactionCompleted
boolean
Indicates if the transaction is completed.
activeOffsets
List
The list of active offsets.
receiverClientId
String
The client ID of the receiver.
visibilitySeconds
int
The visibility timeout for the message in seconds.
isAutoAcked
boolean
Indicates whether the message was auto-acknowledged.
Response: QueueMessageReceived
class attributes
Here's the requested Markdown table for the QueueMessageReceived
class:
id
String
The unique identifier for the message.
channel
String
The channel from which the message was received.
metadata
String
Metadata associated with the message.
body
byte[]
The body of the message in byte array format.
fromClientId
String
The ID of the client that sent the message.
tags
Map<String, String>
Key-value pairs representing tags for the message.
timestamp
Instant
The timestamp when the message was created.
sequence
long
The sequence number of the message.
receiveCount
int
The number of times the message has been received.
isReRouted
boolean
Indicates whether the message was rerouted.
reRouteFromQueue
String
The name of the queue from which the message was rerouted.
expiredAt
Instant
The expiration time of the message, if applicable.
delayedTo
Instant
The time the message is delayed until, if applicable.
This method allows you to receive messages from a specified Queue channel. You can configure the polling behavior, including the maximum number of messages to receive and the wait timeout. The response provides detailed information about the received messages and the transaction.
Acknowledge (ack): Mark the message as processed and remove it from the queue.
Reject: Reject the message. It won't be requeued.
Requeue: Send the message back to the queue for later processing.
Choose the appropriate handling option based on your application's logic and requirements.
This example demonstrates how to use the bulk operations ackAll
, rejectAll
, and requeueAll
on the QueuesPollResponse
object.
This example showcases the following bulk operations:
ackAll(): Acknowledges all received messages, marking them as processed and removing them from the queue.
requeueAll(String channel): Requeues all received messages back to the specified channel for later processing.
rejectAll(): Rejects all received messages. They won't be requeued.
These bulk operations are particularly useful when you need to apply the same action to all received messages based on certain conditions or business logic. They can significantly simplify your code when dealing with multiple messages at once.
The "Waiting" operation allows you to retrieve information about messages waiting in a queue without removing them from the queue. This can be useful for monitoring queue status or implementing custom processing logic based on waiting messages.
channelName
String
The name of the channel to check for waiting messages.
None
Yes
maxNumberOfMessages
int
The maximum number of waiting messages to retrieve.
None
Yes
waitTimeSeconds
int
The maximum time to wait for messages, in seconds.
None
Yes
QueueMessagesWaiting
Class Attributesmessages
List
List of waiting messages in the queue.
isError
boolean
Indicates if there was an error.
error
String
The error message, if any.
QueueMessageWaitingPulled
Class Attributesid
String
The unique identifier of the message.
channel
String
The channel name of the message.
metadata
String
Additional metadata associated with the message.
body
byte[]
The body content of the message.
fromClientId
String
The ID of the client that sent the message.
tags
Map<String, String>
Key-value pairs of tags associated with the message.
timestamp
Instant
The timestamp when the message was sent.
sequence
long
The sequence number of the message in the queue.
receiveCount
int
The number of times this message has been received.
isReRouted
boolean
Indicates if the message has been re-routed.
reRouteFromQueue
String
The original queue name if the message was re-routed.
expiredAt
Instant
The timestamp when the message will expire.
delayedTo
Instant
The timestamp until which the message is delayed for processing.
receiverClientId
String
The ID of the client receiving the message.
This method allows you to peek at messages waiting in a specified queue channel without removing them. It's particularly useful for:
Monitoring queue depth and content.
Implementing custom logic based on the number or content of waiting messages.
Previewing messages before deciding whether to process them.
The "Pull Messages" operation allows you to retrieve and remove messages from a queue. Unlike the "Waiting" operation, this actually dequeues the messages, making them unavailable for other consumers.
channelName
String
The name of the channel to pull messages from.
None
Yes
maxNumberOfMessages
int
The maximum number of messages to pull.
None
Yes
waitTimeSeconds
int
The maximum time to wait for messages, in seconds.
None
Yes
QueueMessagesPulled
Class Attributesmessages
List
List of pulled messages from the queue.
isError
boolean
Indicates if there was an error.
error
String
The error message, if any.
QueueMessageWaitingPulled
Class Attributesid
String
The unique identifier of the message.
channel
String
The channel name of the message.
metadata
String
Additional metadata associated with the message.
body
byte[]
The body content of the message.
fromClientId
String
The ID of the client that sent the message.
tags
Map<String, String>
Key-value pairs of tags associated with the message.
timestamp
Instant
The timestamp when the message was sent.
sequence
long
The sequence number of the message in the queue.
receiveCount
int
The number of times this message has been received.
isReRouted
boolean
Indicates if the message has been re-routed.
reRouteFromQueue
String
The original queue name if the message was re-routed.
expiredAt
Instant
The timestamp when the message will expire.
delayedTo
Instant
The timestamp until which the message is delayed for processing.
receiverClientId
String
The ID of the client receiving the message.
This example demonstrates how to pull messages from a specified queue channel, process them, and access all the available metadata for each message.
The pull
operation removes messages from the queue. Once pulled, these messages are no longer available to other consumers.
The QueueMessagesPulled
object includes an isError
flag and an error
string, which should be checked before processing the messages.
The structure of QueueMessageWaitingPulled
is the same for both "waiting" and "pull" operations, providing consistent access to message metadata.