KubeMQ Docs
KubeMQ.ioLogin / Register
  • Introduction
  • What's New
  • Getting Started
    • Quick Start
    • Build & Deploy
    • Create Cluster
      • Build & Deploy
      • Helm
      • Openshift
    • Create Connector
      • Build & Deploy
      • Helm
      • Openshift
    • Message Patterns
      • Queues
      • Pub/Sub
      • RPC
  • Learn
    • The Basics
      • Channels
      • Smart Routing
      • Grouping
    • Message Patterns
      • Queues
      • Pub/Sub
      • RPC
    • Access Control
      • Authentication
      • Authorization
      • Notifications
    • Clustering and HA
    • Connectors
      • KubeMQ Targets
      • KubeMQ Sources
      • KubeMQ Bridges
  • Configuration
    • Cluster
      • Set Cluster Name
      • Set Cluster Namespace
      • Set Persistent Volume
      • Set Cluster Replicas
      • Set Cluster Image
      • Set Cluster Security
      • Set Authentication
      • Set Authorization
      • Set Notification
      • Set License
      • Set gRPC Interface
      • Set Rest Interface
      • Set Api Interface
      • Set Store Settings
      • Set Queues Settings
      • Set Routing
      • Set Health Probe
      • Set Resources Limits
      • Set Logs
      • Set Node Selectors
    • Connectors
      • KubeMQ Targets
        • Standalone
          • Redis
          • Memcached
          • Postgres
          • Mysql
          • MSSql
          • Percona
          • Aerospike
          • ReThinkDB
          • MongoDB
          • Elastic Search
          • Cassandra
          • Couchbase
          • CockroachDB
          • Kafka
          • Nats
          • MQTT
          • ActiveMQ
          • IBM-MQ
          • Minio/S3
          • OpenFaas
          • HTTP
        • AWS
          • Athena
          • DynamoDB
          • Elastic Search
          • KeySpaces
          • MariaDB
          • MSSql
          • MySQL
          • Postgres
          • RedShift
          • RedShift Service
          • AmazonMQ
          • MSK
          • Kinesis
          • SQS
          • SNS
          • S3
          • Lambda
          • CloudWatch Logs
          • CloudWatch Events
          • CloudWatch Metrics
        • GCP
          • Redis
          • Memcached
          • Postgres
          • Mysql
          • BigQuery
          • BigTable
          • Firestore
          • Spanner
          • Firebase
          • Pub/Sub
          • Storage
          • Functions
        • Azure
          • Azure SQL
          • Mysql
          • Postgres
          • Blob
          • Files
          • Queue
          • Events Hub
          • Service Bus
        • Sources
          • Queue
          • Events
          • Events Store
          • Command
          • Query
      • KubeMQ Sources
        • HTTP
        • Messaging
          • Kafka
          • RabbitMQ
          • MQTT
          • ActiveMQ
          • IBM-MQ
          • Nats
        • AWS
          • AmazonMQ
          • MSK
          • SQS
        • GCP
          • Pub/Sub
        • Azure
          • EventHubs
          • ServiceBus
        • Targets
          • Queue
          • Events
          • Events Store
          • Command
          • Query
      • KubeMQ Bridges
        • Targets
          • Queue
          • Events
          • Events Store
          • Command
          • Query
        • Sources
          • Queue
          • Events
          • Events Store
          • Command
          • Query
    • Docker
  • HOW TO
    • Connect Your Cluster
    • Show Dashboard
    • Get Cluster Status
    • Get Cluster Logs
  • SDK
    • Java
    • Java (Springboot)
    • C# (.NET)
    • Go
    • Python
    • Node
    • Rest
  • Troubleshooting
    • Start Here
  • License
    • Open Source Software Notices
Powered by GitBook
On this page
  • Sources
  • SDK
  • Examples
  • Prerequisites
  • Installation
  • Running Examples
  • SDK Overview
  • PubSub Events Operations
  • Create Channel
  • Delete Channel
  • List Channels
  • Send Event / Subscribe Message
  • PubSub EventsStore Operations
  • Create Channel
  • Delete Channel
  • List Channels
  • Send Event / Subscribe Message
  • Commands & Queries – Commands Operations
  • Create Channel
  • Delete Channel
  • List Channels
  • Send Command / Receive Request
  • Commands & Queries – Queries Operations
  • Create Channel
  • Delete Channel
  • List Channels
  • Send Query / Receive Request
  • Queues Operations
  • Create Channel
  • Delete Channel
  • List Channels
  • Send / Receive Queue Messages

Was this helpful?

  1. SDK

Go

PreviousC# (.NET) NextPython

Last updated 6 months ago

Was this helpful?

The KubeMQ SDK for Go enables Go developers to seamlessly communicate with the server, implementing various communication patterns such as Events, EventStore, Commands, Queries, and Queues.

Sources

SDK

Examples

Prerequisites

  • Go SDK 1.17 higher

  • KubeMQ server running locally or accessible over the network

Installation

go get github.com/kubemq-io/kubemq-go

Running Examples

SDK Overview

The SDK implements all communication patterns available through the KubeMQ server:

  • PubSub

    • Events

    • EventStore

  • Commands & Queries (CQ)

    • Commands

    • Queries

  • Queues

PubSub Events Operations

Create Channel

Create a new Events channel.

Request Parameters

Name
Type
Description
Default Value
Mandatory

Ctx

context

The context for the request.

None

Yes

ChannelName

String

Name of the channel you want to create

None

Yes

Response

Name
Type
Description

Err

error

Error message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func createEventsChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsClient, err := kubemq.NewEventsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-channel-creator"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := eventsClient.Create(ctx, "events-channel"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Events channel.

Request Parameters

Name
Type
Description
Default Value
Mandatory

ChannelName

String

Name of the channel you want to delete

None

Yes

Response

Name
Type
Description

Err

error

Error message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func deleteEventsChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsClient, err := kubemq.NewEventsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-channel-delete"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := eventsClient.Delete(ctx, "events-channel"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Events channels.

Request Parameters

Name
Type
Description
Default Value
Mandatory

SearchQuery

String

Search query to filter channels (optional)

None

No

Response

Returns a list where each PubSubChannel has the following attributes:

Name
Type
Description

Name

String

The name of the Pub/Sub channel.

Type

String

The type of the Pub/Sub channel.

LastActivity

long

The timestamp of the last activity on the channel, represented in milliseconds since epoch.

IsActive

boolean

Indicates whether the channel is active or not.

Incoming

PubSubStats

The statistics related to incoming messages for this channel.

Outgoing

PubSubStats

The statistics related to outgoing messages for this channel.

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func listEventsChannels() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsClient, err := kubemq.NewEventsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-channel-lister"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := eventsClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send Event / Subscribe Message

Sends a message to an Events channel.

Send Request: Event

Name
Type
Description
Default Value
Mandatory

Id

String

Unique identifier for the event message.

None

No

Channel

String

The channel to which the event message is sent.

None

Yes

Metadata

String

Metadata associated with the event message.

None

No

Body

byte[]

Body of the event message in bytes.

Empty byte array

No

Tags

Map<String, String>

Tags associated with the event message as key-value pairs.

Empty Map

No

Send Response

Name
Type
Description

Err

error

Error message if any

Subscribe Request: EventsSubscription

Name
Type
Description
Default Value
Mandatory

Channel

String

The channel to subscribe to.

None

Yes

Group

String

The group to subscribe with.

None

No

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
	"time"
)

func sendSubscribeEvents() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsClient, err := kubemq.NewEventsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-send-subscribe"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	subReq := &kubemq.EventsSubscription{
		Channel:  "events-channel",
		Group:    "",
		ClientId: "",
	}
	err = eventsClient.Subscribe(ctx, subReq, func(msg *kubemq.Event, err error) {
		log.Println(msg.String())
	})
	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(300 * time.Second)

	err = eventsClient.Send(ctx, &kubemq.Event{
		Channel:  "events-channel",
		Metadata: "some-metadata",
		Body:     []byte("hello kubemq - sending event"),
	})

	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(1 * time.Second)
}

PubSub EventsStore Operations

Create Channel

Create a new Events Store channel.

Request Parameters

Name
Type
Description
Default Value
Mandatory

Ctx

context

The context for the request.

None

Yes

ChannelName

String

Name of the channel you want to create

None

Yes

Response

Name
Type
Description

Err

error

Error message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)
func createEventsStoreChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-store-channel-creator"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsStoreClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := eventsStoreClient.Create(ctx, "events-store-channel"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Events Store channel.

Request Parameters

Name
Type
Description
Default Value
Mandatory

ChannelName

String

Name of the channel you want to delete

None

Yes

Response

Name
Type
Description

Err

error

Error message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func deleteEventsStoreChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-store-channel-delete"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsStoreClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := eventsStoreClient.Delete(ctx, "events-store-channel"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Events channels.

Request Parameters

Name
Type
Description
Default Value
Mandatory

SearchQuery

String

Search query to filter channels (optional)

None

No

Response

Returns a list where each PubSubChannel has the following attributes:

Name
Type
Description

Name

String

The name of the Pub/Sub channel.

Type

String

The type of the Pub/Sub channel.

LastActivity

long

The timestamp of the last activity on the channel, represented in milliseconds since epoch.

IsActive

boolean

Indicates whether the channel is active or not.

Incoming

PubSubStats

The statistics related to incoming messages for this channel.

Outgoing

PubSubStats

The statistics related to outgoing messages for this channel.

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func listEventsStoreChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-store-channel-lister"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsStoreClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := eventsStoreClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send Event / Subscribe Message

Sends a message to an Events channel.

Send Request: Event

Name
Type
Description
Default Value
Mandatory

Id

String

Unique identifier for the event message.

None

No

Channel

String

The channel to which the event message is sent.

None

Yes

Metadata

String

Metadata associated with the event message.

None

No

Body

byte[]

Body of the event message in bytes.

Empty byte array

No

Tags

Map<String, String>

Tags associated with the event message as key-value pairs.

Empty Map

No

Send Response

Name
Type
Description

Err

error

Error message if any

Subscribe Request: EventsStoreSubscription

Name
Type
Description
Default Value
Mandatory

Channel

String

The channel to subscribe to.

None

Yes

Group

String

The group to subscribe with.

None

No

SubscriptionType

EventsStoreSubscription

The Subscription

None

Yes

EventsStoreType Options

Type
Value
Description

StartNewOnly

1

Start storing events from the point when the subscription is made

StartFromFirst

2

Start storing events from the first event available

StartFromLast

3

Start storing events from the last event available

StartAtSequence

4

Start storing events from a specific sequence number

StartAtTime

5

Start storing events from a specific point in time

StartAtTimeDelta

6

Start storing events from a specific time delta in seconds

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
	"time"
)

func sendSubscribeEventsStore() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-store-send-subscribe"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsStoreClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	subReq := &kubemq.EventsStoreSubscription{
		Channel:          "events-store-channel",
		Group:            "",
		ClientId:         "",
		SubscriptionType: kubemq.StartFromFirstEvent(),
	}
	err = eventsStoreClient.Subscribe(ctx, subReq, func(msg *kubemq.EventStoreReceive, err error) {
		log.Println(msg.String())
	})
	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(1 * time.Second)

	result, err := eventsStoreClient.Send(ctx, &kubemq.EventStore{
		Channel:  "events-store-channel",
		Metadata: "some-metadata",
		Body:     []byte("hello kubemq - sending event store"),
	})
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
	time.Sleep(1 * time.Second)

}

Commands & Queries – Commands Operations

Create Channel

Create a new Command channel.

Request Parameters

Name
Type
Description
Default Value
Mandatory

Ctx

context

The context for the request.

None

Yes

ChannelName

String

Name of the channel you want to create

None

Yes

Response

Name
Type
Description

Err

error

Error message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func createCommandsChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	commandsClient, err := kubemq.NewCommandsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := commandsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := commandsClient.Create(ctx, "commands.A"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Command channel.

Request Parameters

Name
Type
Description
Default Value
Mandatory

ChannelName

String

Name of the channel you want to delete

None

Yes

Response

Name
Type
Description

Err

error

Error message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func deleteCommandsChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	commandsClient, err := kubemq.NewCommandsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := commandsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := commandsClient.Delete(ctx, "commands.A"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Command channels.

Request Parameters

Name
Type
Description
Default Value
Mandatory

SearchQuery

String

Search query to filter channels (optional)

None

No

Response

Returns a list where each CQChannel has the following attributes:

Name
Type
Description

Name

String

The name of the Pub/Sub channel.

Type

String

The type of the Pub/Sub channel.

LastActivity

long

The timestamp of the last activity on the channel, represented in milliseconds since epoch.

IsActive

boolean

Indicates whether the channel is active or not.

Incoming

PubSubStats

The statistics related to incoming messages for this channel.

Outgoing

PubSubStats

The statistics related to outgoing messages for this channel.

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func listCommandsChannels() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	commandsClient, err := kubemq.NewCommandsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := commandsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := commandsClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send Command / Receive Request

Sends a command request to a Command channel.

Send Request: CommandMessage

Name
Type
Description
Default Value
Mandatory

Id

String

The ID of the command message.

None

Yes

Channel

String

The channel through which the command message will be sent.

None

Yes

Metadata

String

Additional metadata associated with the command message.

None

No

Body

byte[]

The body of the command message as bytes.

Empty byte array

No

Tags

Map<String, String>

A dictionary of key-value pairs representing tags associated with the command message.

Empty Map

No

Timeout

Duration

The maximum time duration for waiting to response.

None

Yes

Send Response: CommandResponseMessage

Name
Type
Description

CommandId

String

Command Id

ResponseClientId

String

The client ID associated with the command response.

Executed

boolean

Indicates if the command has been executed.

ExecutedAt

time

The timestamp when the command response was created.

Error

String

The error message if there was an error.

Receive Request: CommandsSubscription

Name
Type
Description
Default Value
Mandatory

Channel

String

The channel for the subscription.

None

Yes

Group

String

The group associated with the subscription.

None

No

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
	"time"
)

func sendReceiveCommands() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	commandsClient, err := kubemq.NewCommandsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("sendReceiveCommands"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := commandsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	subRequest := &kubemq.CommandsSubscription{
		Channel:  "commands",
		ClientId: "",
		Group:    "",
	}
	log.Println("subscribing to commands")
	err = commandsClient.Subscribe(ctx, subRequest, func(cmd *kubemq.CommandReceive, err error) {
		log.Println(cmd.String())
		resp := &kubemq.Response{
			RequestId:  cmd.Id,
			ResponseTo: cmd.ResponseTo,
			Metadata:   "some-metadata",
			ExecutedAt: time.Now(),
		}
		if err := commandsClient.Response(ctx, resp); err != nil {
			log.Fatal(err)
		}
	})
	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(1 * time.Second)
	log.Println("sending command")
	result, err := commandsClient.Send(ctx, kubemq.NewCommand().
		SetChannel("commands").
		SetMetadata("some-metadata").
		SetBody([]byte("hello kubemq - sending command")).
		SetTimeout(time.Duration(10)*time.Second))
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
}

Commands & Queries – Queries Operations

Create Channel

Create a new Query channel.

Request Parameters

Name
Type
Description
Default Value
Mandatory

Ctx

context

The context for the request.

None

Yes

ChannelName

String

Name of the channel you want to create

None

Yes

Response

Name
Type
Description

Err

error

Error message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)


func createQueriesChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queriesClient, err := kubemq.NewQueriesClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queriesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := queriesClient.Create(ctx, "queries.A"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Query channel.

Request Parameters

Name
Type
Description
Default Value
Mandatory

ChannelName

String

Name of the channel you want to delete

None

Yes

Response

Name
Type
Description

Err

error

Error message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func deleteQueriesChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queriesClient, err := kubemq.NewQueriesClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queriesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := queriesClient.Delete(ctx, "queries.A"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Query channels.

Request Parameters

Name
Type
Description
Default Value
Mandatory

SearchQuery

String

Search query to filter channels (optional)

None

No

Response

Returns a list where each CQChannel has the following attributes:

Name
Type
Description

Name

String

The name of the Pub/Sub channel.

Type

String

The type of the Pub/Sub channel.

LastActivity

long

The timestamp of the last activity on the channel, represented in milliseconds since epoch.

IsActive

boolean

Indicates whether the channel is active or not.

Incoming

PubSubStats

The statistics related to incoming messages for this channel.

Outgoing

PubSubStats

The statistics related to outgoing messages for this channel.

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func listQueriesChannels() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queriesClient, err := kubemq.NewQueriesClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queriesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := queriesClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send Query / Receive Request

Sends a query request to a Query channel.

Send Request: QueryMessage

Name
Type
Description
Default Value
Mandatory

Id

String

The ID of the query message.

None

Yes

Channel

String

The channel through which the query message will be sent.

None

Yes

Metadata

String

Additional metadata associated with the query message.

None

No

Body

byte[]

The body of the query message as bytes.

Empty byte array

No

Tags

Map<String, String>

A dictionary of key-value pairs representing tags associated with the query message.

Empty Map

No

Timeout

Duration

The maximum time duration for waiting to response.

None

Yes

Send Response: QueryResponse

Name
Type
Description

QueryId

String

Query Id

ResponseClientId

String

The client ID associated with the query response.

Executed

boolean

Indicates if the query has been executed.

Metadata

String

Additional metadata associated with the query response message.

Body

byte[]

The body of the query response message as bytes.

Tags

Map<String, String>

A dictionary of key-value pairs representing tags associated with the query response message.

ExecutedAt

time

The timestamp when the query response was created.

Error

String

The error message if there was an error.

Receive Request: QuerySubscription

Name
Type
Description
Default Value
Mandatory

Channel

String

The channel for the subscription.

None

Yes

Group

String

The group associated with the subscription.

None

No

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
	"time"
)

func sendReceiveQueries() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queriesClient, err := kubemq.NewQueriesClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("sendReceiveQueries"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queriesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	subRequest := &kubemq.QueriesSubscription{
		Channel:  "queries",
		ClientId: "",
		Group:    "",
	}
	log.Println("subscribing to queries")
	err = queriesClient.Subscribe(ctx, subRequest, func(query *kubemq.QueryReceive, err error) {
		log.Println(query.String())
		resp := &kubemq.Response{
			RequestId:  query.Id,
			ResponseTo: query.ResponseTo,
			Metadata:   "some-metadata",
			ExecutedAt: time.Now(),
			Body:       []byte("hello kubemq - sending query response"),
		}
		if err := queriesClient.Response(ctx, resp); err != nil {
			log.Fatal(err)
		}
	})
	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(1 * time.Second)
	log.Println("sending query")
	result, err := queriesClient.Send(ctx, kubemq.NewQuery().
		SetChannel("queries").
		SetMetadata("some-metadata").
		SetBody([]byte("hello kubemq - sending query")).
		SetTimeout(time.Duration(10)*time.Second))
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
}

Queues Operations

Create Channel

Create a new Queue channel.

Request Parameters

Name
Type
Description
Default Value
Mandatory

Ctx

context

The context for the request.

None

Yes

ChannelName

String

Name of the channel you want to create

None

Yes

Response

Name
Type
Description

Err

error

Error message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := queuesClient.Create(ctx, "queues.A"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Queue channel.

Request Parameters

Name
Type
Description
Default Value
Mandatory

ChannelName

String

Name of the channel you want to delete

None

Yes

Response

Name
Type
Description

Err

error

Error message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := queuesClient.Delete(ctx, "queues.A"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Queue channels.

Request Parameters

Name
Type
Description
Default Value
Mandatory

SearchQuery

String

Search query to filter channels (optional)

None

No

Response

Returns a list where each QueuesChannel has the following attributes:

Name
Type
Description

Name

String

The name of the Pub/Sub channel.

Type

String

The type of the Pub/Sub channel.

LastActivity

long

The timestamp of the last activity on the channel, represented in milliseconds since epoch.

IsActive

boolean

Indicates whether the channel is active or not.

Incoming

PubSubStats

The statistics related to incoming messages for this channel.

Outgoing

PubSubStats

The statistics related to outgoing messages for this channel.

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := queuesClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send / Receive Queue Messages

Send and receive messages from a Queue channel.

Send Request: QueueMessage

Name
Type
Description
Default Value
Mandatory

Id

String

The unique identifier for the message.

None

No

Channel

String

The channel of the message.

None

Yes

Metadata

String

The metadata associated with the message.

None

No

Body

byte[]

The body of the message.

new byte[0]

No

Tags

Map<String, String>

The tags associated with the message.

new HashMap<>()

No

PolicyDelaySeconds

int

The delay in seconds before the message becomes available in the queue.

None

No

PolicyExpirationSeconds

int

The expiration time in seconds for the message.

None

No

PolicyMaxReceiveCount

int

The number of receive attempts allowed for the message before it is moved to the dead letter queue.

None

No

PolicyMaxReceiveQueue

String

The dead letter queue where the message will be moved after reaching the maximum receive attempts.

None

No

Send Response: SendResult

Name
Type
Description

Id

String

The unique identifier of the message.

SentAt

LocalDateTime

The timestamp when the message was sent.

ExpiredAt

LocalDateTime

The timestamp when the message will expire.

DelayedTo

LocalDateTime

The timestamp when the message will be delivered.

IsError

boolean

Indicates if there was an error while sending the message.

Error

String

The error message if isError is true.

Receive Request: PollRequest

Name
Type
Description
Default Value
Mandatory

Channel

String

The channel to poll messages from.

None

Yes

MaxItems

int

The maximum number of messages to poll.

1

No

WaitTimeout

int

The wait timeout in seconds for polling messages.

60

No

AutoAck

boolean

Indicates if messages should be auto-acknowledged.

false

No

VisibilitySeconds

int

Add a visibility timeout feature for messages.

0

No

Response: PollResponse

Name
Type
Description

Messages

List

The list of received queue messages.

Response: QueueMessage

Name
Type
Description

Id

String

The unique identifier for the message.

Channel

String

The channel from which the message was received.

Metadata

String

Metadata associated with the message.

Body

byte[]

The body of the message in byte array format.

ClientID

String

The ID of the client that sent the message.

Tags

Map<String, String>

Key-value pairs representing tags for the message.

Timestamp

Instant

The timestamp when the message was created.

Sequence

long

The sequence number of the message.

ReceiveCount

int

The number of times the message has been received.

ReRouted

boolean

Indicates whether the message was rerouted.

ReRoutedFromQueue

String

The name of the queue from which the message was rerouted.

ExpirationAt

Instant

The expiration time of the message, if applicable.

DelayedTo

Instant

The time the message is delayed until, if applicable.

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
	"time"
)

func sendAndReceive() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	msg := queues_stream.NewQueueMessage().
		SetId("message_id").
		SetChannel("sendAndReceive").
		SetMetadata("some-metadata").
		SetBody([]byte("hello world from KubeMQ queue")).
		SetTags(map[string]string{
			"key1": "value1",
			"key2": "value2",
		}).
		SetPolicyDelaySeconds(1).
		SetPolicyExpirationSeconds(10).
		SetPolicyMaxReceiveCount(3).
		SetPolicyMaxReceiveQueue("dlq")
	result, err := queuesClient.Send(ctx, msg)
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
	pollRequest := queues_stream.NewPollRequest().
		SetChannel("sendAndReceive").
		SetMaxItems(1).
		SetWaitTimeout(10).
		SetAutoAck(true)
	msgs, err := queuesClient.Poll(ctx, pollRequest)
	//if err != nil {
	//	log.Fatal(err)
	//}
	//AckAll - Acknowledge all messages
	//if err := msgs.AckAll(); err != nil {
	//	log.Fatal(err)
	//}

	//NackAll - Not Acknowledge all messages
	//if err := msgs.NAckAll(); err != nil {
	//	log.Fatal(err)
	//}

	// RequeueAll - Requeue all messages
	//if err := msgs.ReQueueAll("requeue-queue-channel"); err != nil {
	//	log.Fatal(err)
	//}

	for _, msg := range msgs.Messages {
		log.Println(msg.String())

		// Ack - Acknowledge message
		if err := msg.Ack(); err != nil {
			log.Fatal(err)
		}

		// Nack - Not Acknowledge message
		//if err := msg.NAck(); err != nil {
		//	log.Fatal(err)
		//}

		// Requeue - Requeue message
		//if err := msg.ReQueue("requeue-queue-channel"); err != nil {
		//	log.Fatal(err)
		//}
	}

}

Example with Visibility

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
	"time"
)

func sendAndReceiveWithVisibilityExpiration() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	msg := queues_stream.NewQueueMessage().
		SetId("message_id").
		SetChannel("sendAndReceiveWithVisibility").
		SetMetadata("some-metadata").
		SetBody([]byte("hello world from KubeMQ queue - with visibility"))

	result, err := queuesClient.Send(ctx, msg)
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
	pollRequest := queues_stream.NewPollRequest().
		SetChannel("sendAndReceiveWithVisibility").
		SetMaxItems(1).
		SetWaitTimeout(10).
		SetVisibilitySeconds(2)
	msgs, err := queuesClient.Poll(ctx, pollRequest)
	for _, msg := range msgs.Messages {
		log.Println(msg.String())
		log.Println("Received message, waiting 3 seconds before ack")
		time.Sleep(3 * time.Second)
		if err := msg.Ack(); err != nil {
			log.Fatal(err)
		}
	}

}
func sendAndReceiveWithVisibilityExtension() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	msg := queues_stream.NewQueueMessage().
		SetId("message_id").
		SetChannel("sendAndReceiveWithVisibility").
		SetMetadata("some-metadata").
		SetBody([]byte("hello world from KubeMQ queue - with visibility"))

	result, err := queuesClient.Send(ctx, msg)
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
	pollRequest := queues_stream.NewPollRequest().
		SetChannel("sendAndReceiveWithVisibility").
		SetMaxItems(1).
		SetWaitTimeout(10).
		SetVisibilitySeconds(2)
	msgs, err := queuesClient.Poll(ctx, pollRequest)
	for _, msg := range msgs.Messages {
		log.Println(msg.String())
		log.Println("Received message, waiting 1 seconds before ack")
		time.Sleep(1 * time.Second)
		log.Println("Extending visibility for 3 seconds and waiting 2 seconds before ack")
		if err := msg.ExtendVisibility(3); err != nil {
			log.Fatal(err)
		}
		time.Sleep(2 * time.Second)
		if err := msg.Ack(); err != nil {
			log.Fatal(err)
		}
	}
}

The are standalone projects that showcase the usage of the SDK. To run the examples, ensure you have a running instance of KubeMQ.

examples
KubeMQ
https://github.com/kubemq-io/kubemq-go/tree/master/examples
https://github.com/kubemq-io/kubemq-go