Go

The KubeMQ SDK for Go enables Go developers to seamlessly communicate with the KubeMQ server, implementing various communication patterns such as Events, EventStore, Commands, Queries, and Queues.

Sources

SDK

Examples

https://github.com/kubemq-io/kubemq-go/tree/master/examples

Prerequisites

  • Go SDK 1.17 higher

  • KubeMQ server running locally or accessible over the network

Installation

go get github.com/kubemq-io/kubemq-go

Running Examples

The examples are standalone projects that showcase the usage of the SDK. To run the examples, ensure you have a running instance of KubeMQ.

SDK Overview

The SDK implements all communication patterns available through the KubeMQ server:

  • PubSub

    • Events

    • EventStore

  • Commands & Queries (CQ)

    • Commands

    • Queries

  • Queues

PubSub Events Operations

Create Channel

Create a new Events channel.

Request Parameters

Response

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func createEventsChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsClient, err := kubemq.NewEventsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-channel-creator"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := eventsClient.Create(ctx, "events-channel"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Events channel.

Request Parameters

Response

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func deleteEventsChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsClient, err := kubemq.NewEventsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-channel-delete"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := eventsClient.Delete(ctx, "events-channel"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Events channels.

Request Parameters

Response

Returns a list where each PubSubChannel has the following attributes:

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func listEventsChannels() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsClient, err := kubemq.NewEventsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-channel-lister"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := eventsClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send Event / Subscribe Message

Sends a message to an Events channel.

Send Request: Event

Send Response

Subscribe Request: EventsSubscription

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
	"time"
)

func sendSubscribeEvents() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsClient, err := kubemq.NewEventsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-send-subscribe"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	subReq := &kubemq.EventsSubscription{
		Channel:  "events-channel",
		Group:    "",
		ClientId: "",
	}
	err = eventsClient.Subscribe(ctx, subReq, func(msg *kubemq.Event, err error) {
		log.Println(msg.String())
	})
	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(300 * time.Second)

	err = eventsClient.Send(ctx, &kubemq.Event{
		Channel:  "events-channel",
		Metadata: "some-metadata",
		Body:     []byte("hello kubemq - sending event"),
	})

	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(1 * time.Second)
}

PubSub EventsStore Operations

Create Channel

Create a new Events Store channel.

Request Parameters

Response

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)
func createEventsStoreChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-store-channel-creator"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsStoreClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := eventsStoreClient.Create(ctx, "events-store-channel"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Events Store channel.

Request Parameters

Response

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func deleteEventsStoreChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-store-channel-delete"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsStoreClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := eventsStoreClient.Delete(ctx, "events-store-channel"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Events channels.

Request Parameters

Response

Returns a list where each PubSubChannel has the following attributes:

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func listEventsStoreChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-store-channel-lister"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsStoreClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := eventsStoreClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send Event / Subscribe Message

Sends a message to an Events channel.

Send Request: Event

Send Response

Subscribe Request: EventsStoreSubscription

EventsStoreType Options

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
	"time"
)

func sendSubscribeEventsStore() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-store-send-subscribe"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsStoreClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	subReq := &kubemq.EventsStoreSubscription{
		Channel:          "events-store-channel",
		Group:            "",
		ClientId:         "",
		SubscriptionType: kubemq.StartFromFirstEvent(),
	}
	err = eventsStoreClient.Subscribe(ctx, subReq, func(msg *kubemq.EventStoreReceive, err error) {
		log.Println(msg.String())
	})
	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(1 * time.Second)

	result, err := eventsStoreClient.Send(ctx, &kubemq.EventStore{
		Channel:  "events-store-channel",
		Metadata: "some-metadata",
		Body:     []byte("hello kubemq - sending event store"),
	})
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
	time.Sleep(1 * time.Second)

}

Commands & Queries – Commands Operations

Create Channel

Create a new Command channel.

Request Parameters

Response

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func createCommandsChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	commandsClient, err := kubemq.NewCommandsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := commandsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := commandsClient.Create(ctx, "commands.A"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Command channel.

Request Parameters

Response

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func deleteCommandsChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	commandsClient, err := kubemq.NewCommandsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := commandsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := commandsClient.Delete(ctx, "commands.A"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Command channels.

Request Parameters

Response

Returns a list where each CQChannel has the following attributes:

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func listCommandsChannels() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	commandsClient, err := kubemq.NewCommandsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := commandsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := commandsClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send Command / Receive Request

Sends a command request to a Command channel.

Send Request: CommandMessage

Send Response: CommandResponseMessage

Receive Request: CommandsSubscription

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
	"time"
)

func sendReceiveCommands() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	commandsClient, err := kubemq.NewCommandsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("sendReceiveCommands"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := commandsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	subRequest := &kubemq.CommandsSubscription{
		Channel:  "commands",
		ClientId: "",
		Group:    "",
	}
	log.Println("subscribing to commands")
	err = commandsClient.Subscribe(ctx, subRequest, func(cmd *kubemq.CommandReceive, err error) {
		log.Println(cmd.String())
		resp := &kubemq.Response{
			RequestId:  cmd.Id,
			ResponseTo: cmd.ResponseTo,
			Metadata:   "some-metadata",
			ExecutedAt: time.Now(),
		}
		if err := commandsClient.Response(ctx, resp); err != nil {
			log.Fatal(err)
		}
	})
	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(1 * time.Second)
	log.Println("sending command")
	result, err := commandsClient.Send(ctx, kubemq.NewCommand().
		SetChannel("commands").
		SetMetadata("some-metadata").
		SetBody([]byte("hello kubemq - sending command")).
		SetTimeout(time.Duration(10)*time.Second))
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
}

Commands & Queries – Queries Operations

Create Channel

Create a new Query channel.

Request Parameters

Response

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)


func createQueriesChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queriesClient, err := kubemq.NewQueriesClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queriesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := queriesClient.Create(ctx, "queries.A"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Query channel.

Request Parameters

Response

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func deleteQueriesChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queriesClient, err := kubemq.NewQueriesClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queriesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := queriesClient.Delete(ctx, "queries.A"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Query channels.

Request Parameters

Response

Returns a list where each CQChannel has the following attributes:

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func listQueriesChannels() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queriesClient, err := kubemq.NewQueriesClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queriesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := queriesClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send Query / Receive Request

Sends a query request to a Query channel.

Send Request: QueryMessage

Send Response: QueryResponse

Receive Request: QuerySubscription

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
	"time"
)

func sendReceiveQueries() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queriesClient, err := kubemq.NewQueriesClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("sendReceiveQueries"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queriesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	subRequest := &kubemq.QueriesSubscription{
		Channel:  "queries",
		ClientId: "",
		Group:    "",
	}
	log.Println("subscribing to queries")
	err = queriesClient.Subscribe(ctx, subRequest, func(query *kubemq.QueryReceive, err error) {
		log.Println(query.String())
		resp := &kubemq.Response{
			RequestId:  query.Id,
			ResponseTo: query.ResponseTo,
			Metadata:   "some-metadata",
			ExecutedAt: time.Now(),
			Body:       []byte("hello kubemq - sending query response"),
		}
		if err := queriesClient.Response(ctx, resp); err != nil {
			log.Fatal(err)
		}
	})
	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(1 * time.Second)
	log.Println("sending query")
	result, err := queriesClient.Send(ctx, kubemq.NewQuery().
		SetChannel("queries").
		SetMetadata("some-metadata").
		SetBody([]byte("hello kubemq - sending query")).
		SetTimeout(time.Duration(10)*time.Second))
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
}

Queues Operations

Create Channel

Create a new Queue channel.

Request Parameters

Response

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := queuesClient.Create(ctx, "queues.A"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Queue channel.

Request Parameters

Response

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := queuesClient.Delete(ctx, "queues.A"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Queue channels.

Request Parameters

Response

Returns a list where each QueuesChannel has the following attributes:

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := queuesClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send / Receive Queue Messages

Send and receive messages from a Queue channel.

Send Request: QueueMessage

Send Response: SendResult

Receive Request: PollRequest

Response: PollResponse

Response: QueueMessage

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
	"time"
)

func sendAndReceive() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	msg := queues_stream.NewQueueMessage().
		SetId("message_id").
		SetChannel("sendAndReceive").
		SetMetadata("some-metadata").
		SetBody([]byte("hello world from KubeMQ queue")).
		SetTags(map[string]string{
			"key1": "value1",
			"key2": "value2",
		}).
		SetPolicyDelaySeconds(1).
		SetPolicyExpirationSeconds(10).
		SetPolicyMaxReceiveCount(3).
		SetPolicyMaxReceiveQueue("dlq")
	result, err := queuesClient.Send(ctx, msg)
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
	pollRequest := queues_stream.NewPollRequest().
		SetChannel("sendAndReceive").
		SetMaxItems(1).
		SetWaitTimeout(10).
		SetAutoAck(true)
	msgs, err := queuesClient.Poll(ctx, pollRequest)
	//if err != nil {
	//	log.Fatal(err)
	//}
	//AckAll - Acknowledge all messages
	//if err := msgs.AckAll(); err != nil {
	//	log.Fatal(err)
	//}

	//NackAll - Not Acknowledge all messages
	//if err := msgs.NAckAll(); err != nil {
	//	log.Fatal(err)
	//}

	// RequeueAll - Requeue all messages
	//if err := msgs.ReQueueAll("requeue-queue-channel"); err != nil {
	//	log.Fatal(err)
	//}

	for _, msg := range msgs.Messages {
		log.Println(msg.String())

		// Ack - Acknowledge message
		if err := msg.Ack(); err != nil {
			log.Fatal(err)
		}

		// Nack - Not Acknowledge message
		//if err := msg.NAck(); err != nil {
		//	log.Fatal(err)
		//}

		// Requeue - Requeue message
		//if err := msg.ReQueue("requeue-queue-channel"); err != nil {
		//	log.Fatal(err)
		//}
	}

}

Example with Visibility

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
	"time"
)

func sendAndReceiveWithVisibilityExpiration() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	msg := queues_stream.NewQueueMessage().
		SetId("message_id").
		SetChannel("sendAndReceiveWithVisibility").
		SetMetadata("some-metadata").
		SetBody([]byte("hello world from KubeMQ queue - with visibility"))

	result, err := queuesClient.Send(ctx, msg)
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
	pollRequest := queues_stream.NewPollRequest().
		SetChannel("sendAndReceiveWithVisibility").
		SetMaxItems(1).
		SetWaitTimeout(10).
		SetVisibilitySeconds(2)
	msgs, err := queuesClient.Poll(ctx, pollRequest)
	for _, msg := range msgs.Messages {
		log.Println(msg.String())
		log.Println("Received message, waiting 3 seconds before ack")
		time.Sleep(3 * time.Second)
		if err := msg.Ack(); err != nil {
			log.Fatal(err)
		}
	}

}
func sendAndReceiveWithVisibilityExtension() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	msg := queues_stream.NewQueueMessage().
		SetId("message_id").
		SetChannel("sendAndReceiveWithVisibility").
		SetMetadata("some-metadata").
		SetBody([]byte("hello world from KubeMQ queue - with visibility"))

	result, err := queuesClient.Send(ctx, msg)
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
	pollRequest := queues_stream.NewPollRequest().
		SetChannel("sendAndReceiveWithVisibility").
		SetMaxItems(1).
		SetWaitTimeout(10).
		SetVisibilitySeconds(2)
	msgs, err := queuesClient.Poll(ctx, pollRequest)
	for _, msg := range msgs.Messages {
		log.Println(msg.String())
		log.Println("Received message, waiting 1 seconds before ack")
		time.Sleep(1 * time.Second)
		log.Println("Extending visibility for 3 seconds and waiting 2 seconds before ack")
		if err := msg.ExtendVisibility(3); err != nil {
			log.Fatal(err)
		}
		time.Sleep(2 * time.Second)
		if err := msg.Ack(); err != nil {
			log.Fatal(err)
		}
	}
}

Last updated