Go
The KubeMQ SDK for Go enables Go developers to seamlessly communicate with the KubeMQ server, implementing various communication patterns such as Events, EventStore, Commands, Queries, and Queues.
Sources
SDK
Examples
https://github.com/kubemq-io/kubemq-go/tree/master/examples
Prerequisites
Go SDK 1.17 higher
KubeMQ server running locally or accessible over the network
Installation
go get github.com/kubemq-io/kubemq-go
Running Examples
The examples are standalone projects that showcase the usage of the SDK. To run the examples, ensure you have a running instance of KubeMQ.
SDK Overview
The SDK implements all communication patterns available through the KubeMQ server:
PubSub
Events
EventStore
Commands & Queries (CQ)
Commands
Queries
Queues
PubSub Events Operations
Create Channel
Create a new Events channel.
Request Parameters
Ctx
context
The context for the request.
None
Yes
ChannelName
String
Name of the channel you want to create
None
Yes
Response
Err
error
Error message if any
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createEventsChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsClient, err := kubemq.NewEventsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-channel-creator"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := eventsClient.Create(ctx, "events-channel"); err != nil {
log.Fatal(err)
}
}
Delete Channel
Delete an existing Events channel.
Request Parameters
ChannelName
String
Name of the channel you want to delete
None
Yes
Response
Err
error
Error message if any
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteEventsChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsClient, err := kubemq.NewEventsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-channel-delete"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := eventsClient.Delete(ctx, "events-channel"); err != nil {
log.Fatal(err)
}
}
List Channels
Retrieve a list of Events channels.
Request Parameters
SearchQuery
String
Search query to filter channels (optional)
None
No
Response
Returns a list where each PubSubChannel
has the following attributes:
Name
String
The name of the Pub/Sub channel.
Type
String
The type of the Pub/Sub channel.
LastActivity
long
The timestamp of the last activity on the channel, represented in milliseconds since epoch.
IsActive
boolean
Indicates whether the channel is active or not.
Incoming
PubSubStats
The statistics related to incoming messages for this channel.
Outgoing
PubSubStats
The statistics related to outgoing messages for this channel.
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listEventsChannels() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsClient, err := kubemq.NewEventsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-channel-lister"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := eventsClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Send Event / Subscribe Message
Sends a message to an Events channel.
Send Request: Event
Event
Id
String
Unique identifier for the event message.
None
No
Channel
String
The channel to which the event message is sent.
None
Yes
Metadata
String
Metadata associated with the event message.
None
No
Body
byte[]
Body of the event message in bytes.
Empty byte array
No
Tags
Map<String, String>
Tags associated with the event message as key-value pairs.
Empty Map
No
Send Response
Err
error
Error message if any
Subscribe Request: EventsSubscription
EventsSubscription
Channel
String
The channel to subscribe to.
None
Yes
Group
String
The group to subscribe with.
None
No
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendSubscribeEvents() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsClient, err := kubemq.NewEventsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-send-subscribe"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
subReq := &kubemq.EventsSubscription{
Channel: "events-channel",
Group: "",
ClientId: "",
}
err = eventsClient.Subscribe(ctx, subReq, func(msg *kubemq.Event, err error) {
log.Println(msg.String())
})
if err != nil {
log.Fatal(err)
}
time.Sleep(300 * time.Second)
err = eventsClient.Send(ctx, &kubemq.Event{
Channel: "events-channel",
Metadata: "some-metadata",
Body: []byte("hello kubemq - sending event"),
})
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
}
PubSub EventsStore Operations
Create Channel
Create a new Events Store channel.
Request Parameters
Ctx
context
The context for the request.
None
Yes
ChannelName
String
Name of the channel you want to create
None
Yes
Response
Err
error
Error message if any
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createEventsStoreChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-store-channel-creator"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsStoreClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := eventsStoreClient.Create(ctx, "events-store-channel"); err != nil {
log.Fatal(err)
}
}
Delete Channel
Delete an existing Events Store channel.
Request Parameters
ChannelName
String
Name of the channel you want to delete
None
Yes
Response
Err
error
Error message if any
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteEventsStoreChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-store-channel-delete"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsStoreClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := eventsStoreClient.Delete(ctx, "events-store-channel"); err != nil {
log.Fatal(err)
}
}
List Channels
Retrieve a list of Events channels.
Request Parameters
SearchQuery
String
Search query to filter channels (optional)
None
No
Response
Returns a list where each PubSubChannel
has the following attributes:
Name
String
The name of the Pub/Sub channel.
Type
String
The type of the Pub/Sub channel.
LastActivity
long
The timestamp of the last activity on the channel, represented in milliseconds since epoch.
IsActive
boolean
Indicates whether the channel is active or not.
Incoming
PubSubStats
The statistics related to incoming messages for this channel.
Outgoing
PubSubStats
The statistics related to outgoing messages for this channel.
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listEventsStoreChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-store-channel-lister"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsStoreClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := eventsStoreClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Send Event / Subscribe Message
Sends a message to an Events channel.
Send Request: Event
Event
Id
String
Unique identifier for the event message.
None
No
Channel
String
The channel to which the event message is sent.
None
Yes
Metadata
String
Metadata associated with the event message.
None
No
Body
byte[]
Body of the event message in bytes.
Empty byte array
No
Tags
Map<String, String>
Tags associated with the event message as key-value pairs.
Empty Map
No
Send Response
Err
error
Error message if any
Subscribe Request: EventsStoreSubscription
EventsStoreSubscription
Channel
String
The channel to subscribe to.
None
Yes
Group
String
The group to subscribe with.
None
No
SubscriptionType
EventsStoreSubscription
The Subscription
None
Yes
EventsStoreType Options
StartNewOnly
1
Start storing events from the point when the subscription is made
StartFromFirst
2
Start storing events from the first event available
StartFromLast
3
Start storing events from the last event available
StartAtSequence
4
Start storing events from a specific sequence number
StartAtTime
5
Start storing events from a specific point in time
StartAtTimeDelta
6
Start storing events from a specific time delta in seconds
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendSubscribeEventsStore() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-store-send-subscribe"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsStoreClient.Close()
if err != nil {
log.Fatal(err)
}
}()
subReq := &kubemq.EventsStoreSubscription{
Channel: "events-store-channel",
Group: "",
ClientId: "",
SubscriptionType: kubemq.StartFromFirstEvent(),
}
err = eventsStoreClient.Subscribe(ctx, subReq, func(msg *kubemq.EventStoreReceive, err error) {
log.Println(msg.String())
})
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
result, err := eventsStoreClient.Send(ctx, &kubemq.EventStore{
Channel: "events-store-channel",
Metadata: "some-metadata",
Body: []byte("hello kubemq - sending event store"),
})
if err != nil {
log.Fatal(err)
}
log.Println(result)
time.Sleep(1 * time.Second)
}
Commands & Queries – Commands Operations
Create Channel
Create a new Command channel.
Request Parameters
Ctx
context
The context for the request.
None
Yes
ChannelName
String
Name of the channel you want to create
None
Yes
Response
Err
error
Error message if any
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createCommandsChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
commandsClient, err := kubemq.NewCommandsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := commandsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := commandsClient.Create(ctx, "commands.A"); err != nil {
log.Fatal(err)
}
}
Delete Channel
Delete an existing Command channel.
Request Parameters
ChannelName
String
Name of the channel you want to delete
None
Yes
Response
Err
error
Error message if any
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteCommandsChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
commandsClient, err := kubemq.NewCommandsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := commandsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := commandsClient.Delete(ctx, "commands.A"); err != nil {
log.Fatal(err)
}
}
List Channels
Retrieve a list of Command channels.
Request Parameters
SearchQuery
String
Search query to filter channels (optional)
None
No
Response
Returns a list where each CQChannel
has the following attributes:
Name
String
The name of the Pub/Sub channel.
Type
String
The type of the Pub/Sub channel.
LastActivity
long
The timestamp of the last activity on the channel, represented in milliseconds since epoch.
IsActive
boolean
Indicates whether the channel is active or not.
Incoming
PubSubStats
The statistics related to incoming messages for this channel.
Outgoing
PubSubStats
The statistics related to outgoing messages for this channel.
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listCommandsChannels() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
commandsClient, err := kubemq.NewCommandsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := commandsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := commandsClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Send Command / Receive Request
Sends a command request to a Command channel.
Send Request: CommandMessage
CommandMessage
Id
String
The ID of the command message.
None
Yes
Channel
String
The channel through which the command message will be sent.
None
Yes
Metadata
String
Additional metadata associated with the command message.
None
No
Body
byte[]
The body of the command message as bytes.
Empty byte array
No
Tags
Map<String, String>
A dictionary of key-value pairs representing tags associated with the command message.
Empty Map
No
Timeout
Duration
The maximum time duration for waiting to response.
None
Yes
Send Response: CommandResponseMessage
CommandResponseMessage
CommandId
String
Command Id
ResponseClientId
String
The client ID associated with the command response.
Executed
boolean
Indicates if the command has been executed.
ExecutedAt
time
The timestamp when the command response was created.
Error
String
The error message if there was an error.
Receive Request: CommandsSubscription
CommandsSubscription
Channel
String
The channel for the subscription.
None
Yes
Group
String
The group associated with the subscription.
None
No
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendReceiveCommands() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
commandsClient, err := kubemq.NewCommandsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("sendReceiveCommands"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := commandsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
subRequest := &kubemq.CommandsSubscription{
Channel: "commands",
ClientId: "",
Group: "",
}
log.Println("subscribing to commands")
err = commandsClient.Subscribe(ctx, subRequest, func(cmd *kubemq.CommandReceive, err error) {
log.Println(cmd.String())
resp := &kubemq.Response{
RequestId: cmd.Id,
ResponseTo: cmd.ResponseTo,
Metadata: "some-metadata",
ExecutedAt: time.Now(),
}
if err := commandsClient.Response(ctx, resp); err != nil {
log.Fatal(err)
}
})
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
log.Println("sending command")
result, err := commandsClient.Send(ctx, kubemq.NewCommand().
SetChannel("commands").
SetMetadata("some-metadata").
SetBody([]byte("hello kubemq - sending command")).
SetTimeout(time.Duration(10)*time.Second))
if err != nil {
log.Fatal(err)
}
log.Println(result)
}
Commands & Queries – Queries Operations
Create Channel
Create a new Query channel.
Request Parameters
Ctx
context
The context for the request.
None
Yes
ChannelName
String
Name of the channel you want to create
None
Yes
Response
Err
error
Error message if any
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createQueriesChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queriesClient, err := kubemq.NewQueriesClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queriesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := queriesClient.Create(ctx, "queries.A"); err != nil {
log.Fatal(err)
}
}
Delete Channel
Delete an existing Query channel.
Request Parameters
ChannelName
String
Name of the channel you want to delete
None
Yes
Response
Err
error
Error message if any
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteQueriesChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queriesClient, err := kubemq.NewQueriesClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queriesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := queriesClient.Delete(ctx, "queries.A"); err != nil {
log.Fatal(err)
}
}
List Channels
Retrieve a list of Query channels.
Request Parameters
SearchQuery
String
Search query to filter channels (optional)
None
No
Response
Returns a list where each CQChannel
has the following attributes:
Name
String
The name of the Pub/Sub channel.
Type
String
The type of the Pub/Sub channel.
LastActivity
long
The timestamp of the last activity on the channel, represented in milliseconds since epoch.
IsActive
boolean
Indicates whether the channel is active or not.
Incoming
PubSubStats
The statistics related to incoming messages for this channel.
Outgoing
PubSubStats
The statistics related to outgoing messages for this channel.
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listQueriesChannels() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queriesClient, err := kubemq.NewQueriesClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queriesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := queriesClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Send Query / Receive Request
Sends a query request to a Query channel.
Send Request: QueryMessage
QueryMessage
Id
String
The ID of the query message.
None
Yes
Channel
String
The channel through which the query message will be sent.
None
Yes
Metadata
String
Additional metadata associated with the query message.
None
No
Body
byte[]
The body of the query message as bytes.
Empty byte array
No
Tags
Map<String, String>
A dictionary of key-value pairs representing tags associated with the query message.
Empty Map
No
Timeout
Duration
The maximum time duration for waiting to response.
None
Yes
Send Response: QueryResponse
QueryResponse
QueryId
String
Query Id
ResponseClientId
String
The client ID associated with the query response.
Executed
boolean
Indicates if the query has been executed.
Metadata
String
Additional metadata associated with the query response message.
Body
byte[]
The body of the query response message as bytes.
Tags
Map<String, String>
A dictionary of key-value pairs representing tags associated with the query response message.
ExecutedAt
time
The timestamp when the query response was created.
Error
String
The error message if there was an error.
Receive Request: QuerySubscription
QuerySubscription
Channel
String
The channel for the subscription.
None
Yes
Group
String
The group associated with the subscription.
None
No
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendReceiveQueries() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queriesClient, err := kubemq.NewQueriesClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("sendReceiveQueries"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queriesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
subRequest := &kubemq.QueriesSubscription{
Channel: "queries",
ClientId: "",
Group: "",
}
log.Println("subscribing to queries")
err = queriesClient.Subscribe(ctx, subRequest, func(query *kubemq.QueryReceive, err error) {
log.Println(query.String())
resp := &kubemq.Response{
RequestId: query.Id,
ResponseTo: query.ResponseTo,
Metadata: "some-metadata",
ExecutedAt: time.Now(),
Body: []byte("hello kubemq - sending query response"),
}
if err := queriesClient.Response(ctx, resp); err != nil {
log.Fatal(err)
}
})
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
log.Println("sending query")
result, err := queriesClient.Send(ctx, kubemq.NewQuery().
SetChannel("queries").
SetMetadata("some-metadata").
SetBody([]byte("hello kubemq - sending query")).
SetTimeout(time.Duration(10)*time.Second))
if err != nil {
log.Fatal(err)
}
log.Println(result)
}
Queues Operations
Create Channel
Create a new Queue channel.
Request Parameters
Ctx
context
The context for the request.
None
Yes
ChannelName
String
Name of the channel you want to create
None
Yes
Response
Err
error
Error message if any
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := queuesClient.Create(ctx, "queues.A"); err != nil {
log.Fatal(err)
}
}
Delete Channel
Delete an existing Queue channel.
Request Parameters
ChannelName
String
Name of the channel you want to delete
None
Yes
Response
Err
error
Error message if any
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := queuesClient.Delete(ctx, "queues.A"); err != nil {
log.Fatal(err)
}
}
List Channels
Retrieve a list of Queue channels.
Request Parameters
SearchQuery
String
Search query to filter channels (optional)
None
No
Response
Returns a list where each QueuesChannel
has the following attributes:
Name
String
The name of the Pub/Sub channel.
Type
String
The type of the Pub/Sub channel.
LastActivity
long
The timestamp of the last activity on the channel, represented in milliseconds since epoch.
IsActive
boolean
Indicates whether the channel is active or not.
Incoming
PubSubStats
The statistics related to incoming messages for this channel.
Outgoing
PubSubStats
The statistics related to outgoing messages for this channel.
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := queuesClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Send / Receive Queue Messages
Send and receive messages from a Queue channel.
Send Request: QueueMessage
QueueMessage
Id
String
The unique identifier for the message.
None
No
Channel
String
The channel of the message.
None
Yes
Metadata
String
The metadata associated with the message.
None
No
Body
byte[]
The body of the message.
new byte[0]
No
Tags
Map<String, String>
The tags associated with the message.
new HashMap<>()
No
PolicyDelaySeconds
int
The delay in seconds before the message becomes available in the queue.
None
No
PolicyExpirationSeconds
int
The expiration time in seconds for the message.
None
No
PolicyMaxReceiveCount
int
The number of receive attempts allowed for the message before it is moved to the dead letter queue.
None
No
PolicyMaxReceiveQueue
String
The dead letter queue where the message will be moved after reaching the maximum receive attempts.
None
No
Send Response: SendResult
SendResult
Id
String
The unique identifier of the message.
SentAt
LocalDateTime
The timestamp when the message was sent.
ExpiredAt
LocalDateTime
The timestamp when the message will expire.
DelayedTo
LocalDateTime
The timestamp when the message will be delivered.
IsError
boolean
Indicates if there was an error while sending the message.
Error
String
The error message if isError
is true.
Receive Request: PollRequest
PollRequest
Channel
String
The channel to poll messages from.
None
Yes
MaxItems
int
The maximum number of messages to poll.
1
No
WaitTimeout
int
The wait timeout in seconds for polling messages.
60
No
AutoAck
boolean
Indicates if messages should be auto-acknowledged.
false
No
VisibilitySeconds
int
Add a visibility timeout feature for messages.
0
No
Response: PollResponse
PollResponse
Messages
List
The list of received queue messages.
Response: QueueMessage
Id
String
The unique identifier for the message.
Channel
String
The channel from which the message was received.
Metadata
String
Metadata associated with the message.
Body
byte[]
The body of the message in byte array format.
ClientID
String
The ID of the client that sent the message.
Tags
Map<String, String>
Key-value pairs representing tags for the message.
Timestamp
Instant
The timestamp when the message was created.
Sequence
long
The sequence number of the message.
ReceiveCount
int
The number of times the message has been received.
ReRouted
boolean
Indicates whether the message was rerouted.
ReRoutedFromQueue
String
The name of the queue from which the message was rerouted.
ExpirationAt
Instant
The expiration time of the message, if applicable.
DelayedTo
Instant
The time the message is delayed until, if applicable.
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
"time"
)
func sendAndReceive() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
msg := queues_stream.NewQueueMessage().
SetId("message_id").
SetChannel("sendAndReceive").
SetMetadata("some-metadata").
SetBody([]byte("hello world from KubeMQ queue")).
SetTags(map[string]string{
"key1": "value1",
"key2": "value2",
}).
SetPolicyDelaySeconds(1).
SetPolicyExpirationSeconds(10).
SetPolicyMaxReceiveCount(3).
SetPolicyMaxReceiveQueue("dlq")
result, err := queuesClient.Send(ctx, msg)
if err != nil {
log.Fatal(err)
}
log.Println(result)
pollRequest := queues_stream.NewPollRequest().
SetChannel("sendAndReceive").
SetMaxItems(1).
SetWaitTimeout(10).
SetAutoAck(true)
msgs, err := queuesClient.Poll(ctx, pollRequest)
//if err != nil {
// log.Fatal(err)
//}
//AckAll - Acknowledge all messages
//if err := msgs.AckAll(); err != nil {
// log.Fatal(err)
//}
//NackAll - Not Acknowledge all messages
//if err := msgs.NAckAll(); err != nil {
// log.Fatal(err)
//}
// RequeueAll - Requeue all messages
//if err := msgs.ReQueueAll("requeue-queue-channel"); err != nil {
// log.Fatal(err)
//}
for _, msg := range msgs.Messages {
log.Println(msg.String())
// Ack - Acknowledge message
if err := msg.Ack(); err != nil {
log.Fatal(err)
}
// Nack - Not Acknowledge message
//if err := msg.NAck(); err != nil {
// log.Fatal(err)
//}
// Requeue - Requeue message
//if err := msg.ReQueue("requeue-queue-channel"); err != nil {
// log.Fatal(err)
//}
}
}
Example with Visibility
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
"time"
)
func sendAndReceiveWithVisibilityExpiration() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
msg := queues_stream.NewQueueMessage().
SetId("message_id").
SetChannel("sendAndReceiveWithVisibility").
SetMetadata("some-metadata").
SetBody([]byte("hello world from KubeMQ queue - with visibility"))
result, err := queuesClient.Send(ctx, msg)
if err != nil {
log.Fatal(err)
}
log.Println(result)
pollRequest := queues_stream.NewPollRequest().
SetChannel("sendAndReceiveWithVisibility").
SetMaxItems(1).
SetWaitTimeout(10).
SetVisibilitySeconds(2)
msgs, err := queuesClient.Poll(ctx, pollRequest)
for _, msg := range msgs.Messages {
log.Println(msg.String())
log.Println("Received message, waiting 3 seconds before ack")
time.Sleep(3 * time.Second)
if err := msg.Ack(); err != nil {
log.Fatal(err)
}
}
}
func sendAndReceiveWithVisibilityExtension() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
msg := queues_stream.NewQueueMessage().
SetId("message_id").
SetChannel("sendAndReceiveWithVisibility").
SetMetadata("some-metadata").
SetBody([]byte("hello world from KubeMQ queue - with visibility"))
result, err := queuesClient.Send(ctx, msg)
if err != nil {
log.Fatal(err)
}
log.Println(result)
pollRequest := queues_stream.NewPollRequest().
SetChannel("sendAndReceiveWithVisibility").
SetMaxItems(1).
SetWaitTimeout(10).
SetVisibilitySeconds(2)
msgs, err := queuesClient.Poll(ctx, pollRequest)
for _, msg := range msgs.Messages {
log.Println(msg.String())
log.Println("Received message, waiting 1 seconds before ack")
time.Sleep(1 * time.Second)
log.Println("Extending visibility for 3 seconds and waiting 2 seconds before ack")
if err := msg.ExtendVisibility(3); err != nil {
log.Fatal(err)
}
time.Sleep(2 * time.Second)
if err := msg.Ack(); err != nil {
log.Fatal(err)
}
}
}
Last updated
Was this helpful?