The KubeMQ SDK for Go enables Go developers to seamlessly communicate with the KubeMQ server, implementing various communication patterns such as Events, EventStore, Commands, Queries, and Queues.
Sources
SDK
Examples
https://github.com/kubemq-io/kubemq-go/tree/master/examples
Prerequisites
KubeMQ server running locally or accessible over the network
Installation
Copy go get github.com/kubemq-io/kubemq-go
Running Examples
The examples are standalone projects that showcase the usage of the SDK. To run the examples, ensure you have a running instance of KubeMQ.
SDK Overview
The SDK implements all communication patterns available through the KubeMQ server:
PubSub Events Operations
Create Channel
Create a new Events channel.
Request Parameters
Response
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createEventsChannel () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
eventsClient, err := kubemq. NewEventsClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "events-channel-creator" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := eventsClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
if err := eventsClient. Create (ctx, "events-channel" ); err != nil {
log. Fatal (err)
}
}
Delete Channel
Delete an existing Events channel.
Request Parameters
Response
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteEventsChannel () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
eventsClient, err := kubemq. NewEventsClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "events-channel-delete" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := eventsClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
if err := eventsClient. Delete (ctx, "events-channel" ); err != nil {
log. Fatal (err)
}
}
List Channels
Retrieve a list of Events channels.
Request Parameters
Response
Returns a list where each PubSubChannel
has the following attributes:
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listEventsChannels () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
eventsClient, err := kubemq. NewEventsClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "events-channel-lister" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := eventsClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
channels, err := eventsClient. List (ctx, "" )
if err != nil {
log. Fatal (err)
}
for _, channel := range channels {
log. Println (channel)
}
}
Send Event / Subscribe Message
Sends a message to an Events channel.
Send Request: Event
Send Response
Subscribe Request: EventsSubscription
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendSubscribeEvents () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
eventsClient, err := kubemq. NewEventsClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "events-send-subscribe" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := eventsClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
subReq := & kubemq . EventsSubscription {
Channel: "events-channel" ,
Group: "" ,
ClientId: "" ,
}
err = eventsClient. Subscribe (ctx, subReq, func (msg * kubemq . Event , err error ) {
log. Println (msg. String ())
})
if err != nil {
log. Fatal (err)
}
time. Sleep ( 300 * time.Second)
err = eventsClient. Send (ctx, & kubemq . Event {
Channel: "events-channel" ,
Metadata: "some-metadata" ,
Body: [] byte ( "hello kubemq - sending event" ),
})
if err != nil {
log. Fatal (err)
}
time. Sleep ( 1 * time.Second)
}
PubSub EventsStore Operations
Create Channel
Create a new Events Store channel.
Request Parameters
Response
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createEventsStoreChannel () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
eventsStoreClient, err := kubemq. NewEventsStoreClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "events-store-channel-creator" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := eventsStoreClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
if err := eventsStoreClient. Create (ctx, "events-store-channel" ); err != nil {
log. Fatal (err)
}
}
Delete Channel
Delete an existing Events Store channel.
Request Parameters
Response
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteEventsStoreChannel () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
eventsStoreClient, err := kubemq. NewEventsStoreClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "events-store-channel-delete" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := eventsStoreClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
if err := eventsStoreClient. Delete (ctx, "events-store-channel" ); err != nil {
log. Fatal (err)
}
}
List Channels
Retrieve a list of Events channels.
Request Parameters
Response
Returns a list where each PubSubChannel
has the following attributes:
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listEventsStoreChannel () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
eventsStoreClient, err := kubemq. NewEventsStoreClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "events-store-channel-lister" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := eventsStoreClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
channels, err := eventsStoreClient. List (ctx, "" )
if err != nil {
log. Fatal (err)
}
for _, channel := range channels {
log. Println (channel)
}
}
Send Event / Subscribe Message
Sends a message to an Events channel.
Send Request: Event
Send Response
Subscribe Request: EventsStoreSubscription
EventsStoreType Options
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendSubscribeEventsStore () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
eventsStoreClient, err := kubemq. NewEventsStoreClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "events-store-send-subscribe" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := eventsStoreClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
subReq := & kubemq . EventsStoreSubscription {
Channel: "events-store-channel" ,
Group: "" ,
ClientId: "" ,
SubscriptionType: kubemq. StartFromFirstEvent (),
}
err = eventsStoreClient. Subscribe (ctx, subReq, func (msg * kubemq . EventStoreReceive , err error ) {
log. Println (msg. String ())
})
if err != nil {
log. Fatal (err)
}
time. Sleep ( 1 * time.Second)
result, err := eventsStoreClient. Send (ctx, & kubemq . EventStore {
Channel: "events-store-channel" ,
Metadata: "some-metadata" ,
Body: [] byte ( "hello kubemq - sending event store" ),
})
if err != nil {
log. Fatal (err)
}
log. Println (result)
time. Sleep ( 1 * time.Second)
}
Commands & Queries – Commands Operations
Create Channel
Create a new Command channel.
Request Parameters
Response
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createCommandsChannel () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
commandsClient, err := kubemq. NewCommandsClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "example" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := commandsClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
if err := commandsClient. Create (ctx, "commands.A" ); err != nil {
log. Fatal (err)
}
}
Delete Channel
Delete an existing Command channel.
Request Parameters
Response
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteCommandsChannel () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
commandsClient, err := kubemq. NewCommandsClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "example" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := commandsClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
if err := commandsClient. Delete (ctx, "commands.A" ); err != nil {
log. Fatal (err)
}
}
List Channels
Retrieve a list of Command channels.
Request Parameters
Response
Returns a list where each CQChannel
has the following attributes:
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listCommandsChannels () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
commandsClient, err := kubemq. NewCommandsClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "example" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := commandsClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
channels, err := commandsClient. List (ctx, "" )
if err != nil {
log. Fatal (err)
}
for _, channel := range channels {
log. Println (channel)
}
}
Send Command / Receive Request
Sends a command request to a Command channel.
Send Request: CommandMessage
Send Response: CommandResponseMessage
Receive Request: CommandsSubscription
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendReceiveCommands () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
commandsClient, err := kubemq. NewCommandsClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "sendReceiveCommands" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := commandsClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
subRequest := & kubemq . CommandsSubscription {
Channel: "commands" ,
ClientId: "" ,
Group: "" ,
}
log. Println ( "subscribing to commands" )
err = commandsClient. Subscribe (ctx, subRequest, func (cmd * kubemq . CommandReceive , err error ) {
log. Println (cmd. String ())
resp := & kubemq . Response {
RequestId: cmd.Id,
ResponseTo: cmd.ResponseTo,
Metadata: "some-metadata" ,
ExecutedAt: time. Now (),
}
if err := commandsClient. Response (ctx, resp); err != nil {
log. Fatal (err)
}
})
if err != nil {
log. Fatal (err)
}
time. Sleep ( 1 * time.Second)
log. Println ( "sending command" )
result, err := commandsClient. Send (ctx, kubemq. NewCommand ().
SetChannel ( "commands" ).
SetMetadata ( "some-metadata" ).
SetBody ([] byte ( "hello kubemq - sending command" )).
SetTimeout (time. Duration ( 10 ) * time.Second))
if err != nil {
log. Fatal (err)
}
log. Println (result)
}
Commands & Queries – Queries Operations
Create Channel
Create a new Query channel.
Request Parameters
Response
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createQueriesChannel () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
queriesClient, err := kubemq. NewQueriesClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "example" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := queriesClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
if err := queriesClient. Create (ctx, "queries.A" ); err != nil {
log. Fatal (err)
}
}
Delete Channel
Delete an existing Query channel.
Request Parameters
Response
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteQueriesChannel () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
queriesClient, err := kubemq. NewQueriesClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "example" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := queriesClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
if err := queriesClient. Delete (ctx, "queries.A" ); err != nil {
log. Fatal (err)
}
}
List Channels
Retrieve a list of Query channels.
Request Parameters
Response
Returns a list where each CQChannel
has the following attributes:
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listQueriesChannels () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
queriesClient, err := kubemq. NewQueriesClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "example" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := queriesClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
channels, err := queriesClient. List (ctx, "" )
if err != nil {
log. Fatal (err)
}
for _, channel := range channels {
log. Println (channel)
}
}
Send Query / Receive Request
Sends a query request to a Query channel.
Send Request: QueryMessage
Send Response: QueryResponse
Receive Request: QuerySubscription
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendReceiveQueries () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
queriesClient, err := kubemq. NewQueriesClient (ctx,
kubemq. WithAddress ( "localhost" , 50000 ),
kubemq. WithClientId ( "sendReceiveQueries" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := queriesClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
subRequest := & kubemq . QueriesSubscription {
Channel: "queries" ,
ClientId: "" ,
Group: "" ,
}
log. Println ( "subscribing to queries" )
err = queriesClient. Subscribe (ctx, subRequest, func (query * kubemq . QueryReceive , err error ) {
log. Println (query. String ())
resp := & kubemq . Response {
RequestId: query.Id,
ResponseTo: query.ResponseTo,
Metadata: "some-metadata" ,
ExecutedAt: time. Now (),
Body: [] byte ( "hello kubemq - sending query response" ),
}
if err := queriesClient. Response (ctx, resp); err != nil {
log. Fatal (err)
}
})
if err != nil {
log. Fatal (err)
}
time. Sleep ( 1 * time.Second)
log. Println ( "sending query" )
result, err := queriesClient. Send (ctx, kubemq. NewQuery ().
SetChannel ( "queries" ).
SetMetadata ( "some-metadata" ).
SetBody ([] byte ( "hello kubemq - sending query" )).
SetTimeout (time. Duration ( 10 ) * time.Second))
if err != nil {
log. Fatal (err)
}
log. Println (result)
}
Queues Operations
Create Channel
Create a new Queue channel.
Request Parameters
Response
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
)
func main () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
queuesClient, err := queues_stream. NewQueuesStreamClient (ctx,
queues_stream. WithAddress ( "localhost" , 50000 ),
queues_stream. WithClientId ( "example" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := queuesClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
if err := queuesClient. Create (ctx, "queues.A" ); err != nil {
log. Fatal (err)
}
}
Delete Channel
Delete an existing Queue channel.
Request Parameters
Response
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
)
func main () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
queuesClient, err := queues_stream. NewQueuesStreamClient (ctx,
queues_stream. WithAddress ( "localhost" , 50000 ),
queues_stream. WithClientId ( "example" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := queuesClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
if err := queuesClient. Delete (ctx, "queues.A" ); err != nil {
log. Fatal (err)
}
}
List Channels
Retrieve a list of Queue channels.
Request Parameters
Response
Returns a list where each QueuesChannel
has the following attributes:
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
)
func main () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
queuesClient, err := queues_stream. NewQueuesStreamClient (ctx,
queues_stream. WithAddress ( "localhost" , 50000 ),
queues_stream. WithClientId ( "example" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := queuesClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
channels, err := queuesClient. List (ctx, "" )
if err != nil {
log. Fatal (err)
}
for _, channel := range channels {
log. Println (channel)
}
}
Send / Receive Queue Messages
Send and receive messages from a Queue channel.
Send Request: QueueMessage
Send Response: SendResult
Receive Request: PollRequest
Response: PollResponse
Response: QueueMessage
Example
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
"time"
)
func sendAndReceive () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
queuesClient, err := queues_stream. NewQueuesStreamClient (ctx,
queues_stream. WithAddress ( "localhost" , 50000 ),
queues_stream. WithClientId ( "example" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := queuesClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
msg := queues_stream. NewQueueMessage ().
SetId ( "message_id" ).
SetChannel ( "sendAndReceive" ).
SetMetadata ( "some-metadata" ).
SetBody ([] byte ( "hello world from KubeMQ queue" )).
SetTags ( map [ string ] string {
"key1" : "value1" ,
"key2" : "value2" ,
}).
SetPolicyDelaySeconds ( 1 ).
SetPolicyExpirationSeconds ( 10 ).
SetPolicyMaxReceiveCount ( 3 ).
SetPolicyMaxReceiveQueue ( "dlq" )
result, err := queuesClient. Send (ctx, msg)
if err != nil {
log. Fatal (err)
}
log. Println (result)
pollRequest := queues_stream. NewPollRequest ().
SetChannel ( "sendAndReceive" ).
SetMaxItems ( 1 ).
SetWaitTimeout ( 10 ).
SetAutoAck ( true )
msgs, err := queuesClient. Poll (ctx, pollRequest)
//if err != nil {
// log.Fatal(err)
//}
//AckAll - Acknowledge all messages
//if err := msgs.AckAll(); err != nil {
// log.Fatal(err)
//}
//NackAll - Not Acknowledge all messages
//if err := msgs.NAckAll(); err != nil {
// log.Fatal(err)
//}
// RequeueAll - Requeue all messages
//if err := msgs.ReQueueAll("requeue-queue-channel"); err != nil {
// log.Fatal(err)
//}
for _, msg := range msgs.Messages {
log. Println (msg. String ())
// Ack - Acknowledge message
if err := msg. Ack (); err != nil {
log. Fatal (err)
}
// Nack - Not Acknowledge message
//if err := msg.NAck(); err != nil {
// log.Fatal(err)
//}
// Requeue - Requeue message
//if err := msg.ReQueue("requeue-queue-channel"); err != nil {
// log.Fatal(err)
//}
}
}
Example with Visibility
Copy package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
"time"
)
func sendAndReceiveWithVisibilityExpiration () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
queuesClient, err := queues_stream. NewQueuesStreamClient (ctx,
queues_stream. WithAddress ( "localhost" , 50000 ),
queues_stream. WithClientId ( "example" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := queuesClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
msg := queues_stream. NewQueueMessage ().
SetId ( "message_id" ).
SetChannel ( "sendAndReceiveWithVisibility" ).
SetMetadata ( "some-metadata" ).
SetBody ([] byte ( "hello world from KubeMQ queue - with visibility" ))
result, err := queuesClient. Send (ctx, msg)
if err != nil {
log. Fatal (err)
}
log. Println (result)
pollRequest := queues_stream. NewPollRequest ().
SetChannel ( "sendAndReceiveWithVisibility" ).
SetMaxItems ( 1 ).
SetWaitTimeout ( 10 ).
SetVisibilitySeconds ( 2 )
msgs, err := queuesClient. Poll (ctx, pollRequest)
for _, msg := range msgs.Messages {
log. Println (msg. String ())
log. Println ( "Received message, waiting 3 seconds before ack" )
time. Sleep ( 3 * time.Second)
if err := msg. Ack (); err != nil {
log. Fatal (err)
}
}
}
func sendAndReceiveWithVisibilityExtension () {
ctx, cancel := context. WithCancel (context. Background ())
defer cancel ()
queuesClient, err := queues_stream. NewQueuesStreamClient (ctx,
queues_stream. WithAddress ( "localhost" , 50000 ),
queues_stream. WithClientId ( "example" ))
if err != nil {
log. Fatal (err)
}
defer func () {
err := queuesClient. Close ()
if err != nil {
log. Fatal (err)
}
}()
msg := queues_stream. NewQueueMessage ().
SetId ( "message_id" ).
SetChannel ( "sendAndReceiveWithVisibility" ).
SetMetadata ( "some-metadata" ).
SetBody ([] byte ( "hello world from KubeMQ queue - with visibility" ))
result, err := queuesClient. Send (ctx, msg)
if err != nil {
log. Fatal (err)
}
log. Println (result)
pollRequest := queues_stream. NewPollRequest ().
SetChannel ( "sendAndReceiveWithVisibility" ).
SetMaxItems ( 1 ).
SetWaitTimeout ( 10 ).
SetVisibilitySeconds ( 2 )
msgs, err := queuesClient. Poll (ctx, pollRequest)
for _, msg := range msgs.Messages {
log. Println (msg. String ())
log. Println ( "Received message, waiting 1 seconds before ack" )
time. Sleep ( 1 * time.Second)
log. Println ( "Extending visibility for 3 seconds and waiting 2 seconds before ack" )
if err := msg. ExtendVisibility ( 3 ); err != nil {
log. Fatal (err)
}
time. Sleep ( 2 * time.Second)
if err := msg. Ack (); err != nil {
log. Fatal (err)
}
}
}