Pub/Sub
Get Started with Pub/ Sub in KubeMQ

Connect Your KubeMQ Cluster

To be able to communicate with KubeMQ interface ports running in Kubernetes cluster, a Port Forward of KubeMQ's ports is needed.
kubemqctl has a handy command that will do it for you:
1
kubemqctl set cluster proxy
Copied!

Subscribe to Events Channel

A consumer can subscribe to the "hello-world" channel with one of the following methods.
kubemqctl
curl
.Net
Java
Go
Python
Node
PHP
Ruby
jquery
Run the following kubemqctl command:
1
kubemqctl events rec hello-world
Copied!
When connected, a stream of events messages will be shown in the console.
The following cURL command is using KubeMQ's REST interface:
1
curl --location --request GET "http://localhost:9090/subscribe/events?client_id=some_client_id&channel=some_channel&group=some_group&subscribe_type=events" \
2
--header "Content-Type: application/json" \
3
--data ""
Copied!
Subscribe to Events in REST interface is using WebSocket for streaming (Push) events to the consumer. You will need to implement a WebSocket receiver accordingly.
The following .NET code snippet is using KubeMQ's .NET SDK with gRPC interface:
1
using System;
2
3
namespace PubSub_Subscribe_to_a_Channel
4
{
5
class Program
6
{
7
static void Main(string[] args)
8
{
9
10
var ChannelName = "hello-world";
11
var ClientID = "hello-world-subscriber";
12
var KubeMQServerAddress = "localhost:50000";
13
14
var subscriber = new KubeMQ.SDK.csharp.Events.Subscriber(KubeMQServerAddress);
15
try
16
{
17
subscriber.SubscribeToEvents(new KubeMQ.SDK.csharp.Subscription.SubscribeRequest
18
{
19
Channel = ChannelName,
20
SubscribeType = KubeMQ.SDK.csharp.Subscription.SubscribeType.Events,
21
ClientID = ClientID
22
23
}, (eventReceive) =>
24
{
25
26
Console.WriteLine(quot;Event Received: EventID:{eventReceive.EventID} Channel:{eventReceive.Channel} Metadata:{eventReceive.Metadata} Body:{ KubeMQ.SDK.csharp.Tools.Converter.FromByteArray(eventReceive.Body)} ");
27
},
28
(errorHandler) =>
29
{
30
Console.WriteLine(errorHandler.Message);
31
});
32
}
33
catch (Exception ex)
34
{
35
Console.WriteLine(ex.Message);
36
}
37
Console.WriteLine("press any key to close PubSub_Subscribe_to_a_Channel");
38
Console.ReadLine();
39
}
40
41
}
42
}
Copied!
When executed, a stream of events messages will be shown in the console.
The following Java code snippet is using KubeMQ's Java SDK with gRPC interface:
1
package io.kubemq.sdk.examples.get_Started.pubSub_Subscribe_to_a_Channel;
2
3
import java.io.IOException;
4
5
import javax.net.ssl.SSLException;
6
7
import io.grpc.stub.StreamObserver;
8
import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
9
import io.kubemq.sdk.event.EventReceive;
10
import io.kubemq.sdk.event.Subscriber;
11
import io.kubemq.sdk.subscription.SubscribeRequest;
12
import io.kubemq.sdk.subscription.SubscribeType;
13
import io.kubemq.sdk.tools.Converter;
14
15
public class Program {
16
17
public static void main(String[] args) {
18
19
20
String channelName = "hello-world", clientID = "hello-world-subscriber", kubeMQAddress = "localhost:50000";
21
Subscriber subscriber = new Subscriber(kubeMQAddress);
22
SubscribeRequest subscribeRequest = new SubscribeRequest();
23
subscribeRequest.setChannel(channelName);
24
subscribeRequest.setClientID(clientID);
25
subscribeRequest.setSubscribeType(SubscribeType.Events);
26
27
StreamObserver<EventReceive> streamObserver = new StreamObserver<EventReceive>() {
28
29
@Override
30
public void onNext(EventReceive value) {
31
try {
32
System.out.printf("Event Received: EventID: %d, Channel: %s, Metadata: %s, Body: %s",
33
value.getEventId(), value.getChannel(), value.getMetadata(),
34
Converter.FromByteArray(value.getBody()));
35
} catch (ClassNotFoundException e) {
36
System.out.printf("ClassNotFoundException: %s",e.getMessage());
37
e.printStackTrace();
38
} catch (IOException e) {
39
System.out.printf("IOException: %s",e.getMessage());
40
e.printStackTrace();
41
}
42
43
}
44
45
@Override
46
public void onError(Throwable t) {
47
System.out.printf("Event Received Error: %s", t.toString());
48
}
49
50
@Override
51
public void onCompleted() {
52
53
}
54
};
55
try {
56
subscriber.SubscribeToEvents(subscribeRequest, streamObserver);
57
} catch (SSLException e) {
58
System.out.printf("SSLException: %s", e.toString());
59
e.printStackTrace();
60
} catch (ServerAddressNotSuppliedException e) {
61
System.out.printf("ServerAddressNotSuppliedException: %s", e.toString());
62
e.printStackTrace();
63
}
64
65
}
66
}
Copied!
When executed, a stream of events messages will be shown in the console.
The following Go code snippet is using KubeMQ's Go SDK with gRPC interface:
1
package main
2
import (
3
"context"
4
"fmt"
5
"github.com/kubemq-io/kubemq-go"
6
"log"
7
)
8
9
func main() {
10
ctx, cancel := context.WithCancel(context.Background())
11
defer cancel()
12
client, err := kubemq.NewClient(ctx,
13
kubemq.WithAddress("localhost", 50000),
14
kubemq.WithClientId("hello-world-subscriber"),
15
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
16
if err != nil {
17
log.Fatal(err)
18
}
19
defer client.Close()
20
channelName := "hello-world"
21
errCh := make(chan error)
22
eventsCh, err := client.SubscribeToEvents(ctx, channelName, "", errCh)
23
if err != nil {
24
log.Fatal(err)
25
return
26
27
}
28
for {
29
select {
30
case err := <-errCh:
31
log.Fatal(err)
32
return
33
case event, more := <-eventsCh:
34
if !more {
35
fmt.Println("Event Received, done")
36
return
37
}
38
log.Printf("Event Received:\nEventID: %s\nChannel: %s\nMetadata: %s\nBody: %s\n", event.Id, event.Channel, event.Metadata, event.Body)
39
case <-ctx.Done():
40
return
41
}
42
}
43
}
Copied!
When executed, a stream of events messages will be shown in the console.
The following Python code snippet is using KubeMQ's Python SDK with gRPC interface:
1
from builtins import input
2
from random import randint
3
from kubemq.events.subscriber import Subscriber
4
from kubemq.tools.listener_cancellation_token import ListenerCancellationToken
5
from kubemq.subscription.subscribe_type import SubscribeType
6
from kubemq.subscription.events_store_type import EventsStoreType
7
from kubemq.subscription.subscribe_request import SubscribeRequest
8
9
10
11
def handle_incoming_events(event):
12
if event:
13
print("Subscriber Received Event: Metadata:'%s', Channel:'%s', Body:'%s tags:%s'" % (
14
event.metadata,
15
event.channel,
16
event.body,
17
event.tags
18
))
19
20
def handle_incoming_error(error_msg):
21
print("received error:%s'" % (
22
error_msg
23
))
24
25
26
if __name__ == "__main__":
27
print("Subscribing to event on channel example")
28
cancel_token=ListenerCancellationToken()
29
30
31
# Subscribe to events without store
32
subscriber = Subscriber("localhost:50000")
33
subscribe_request = SubscribeRequest(
34
channel="testing_event_channel",
35
client_id="hello-world-subscriber",
36
events_store_type=EventsStoreType.Undefined,
37
events_store_type_value=0,
38
group="",
39
subscribe_type=SubscribeType.Events
40
)
41
subscriber.subscribe_to_events(subscribe_request, handle_incoming_events,handle_incoming_error,cancel_token)
42
43
input("Press 'Enter' to stop Listen...\n")
44
cancel_token.cancel()
45
input("Press 'Enter' to stop the application...\n")
Copied!
When executed, a stream of events messages will be shown in the console.
The following JS code snippet is using KubeMQ's NodeJS SDK with gRPC interface:
1
const kubemq = require('kubemq-nodejs');
2
3
let channelName = 'pubsub', clientID = 'hello-world-subscriber',
4
kubeMQHost = 'localhost', kubeMQGrpcPort = '50000';
5
6
let sub = new kubemq.Subscriber(kubeMQHost, kubeMQGrpcPort, clientID, channelName);
7
8
sub.subscribeToEvents(msg => {
9
console.log('Event Received: EventID:' + msg.EventID + ', Channel:' + msg.Channel + ' ,Metadata:' + msg.Metadata + ', Body:' + kubemq.byteToString(msg.Body));
10
}, err => {
11
console.log('error:' + err)
12
})
Copied!
The following PHP code snippet is using KubeMQ's REST interface:
1
<?php
2
3
$curl = curl_init();
4
5
curl_setopt_array($curl, array(
6
CURLOPT_URL => "http://localhost:9090/subscribe/events?client_id=some_client_id&channel=hello-world&group=some_group&subscribe_type=events",
7
CURLOPT_RETURNTRANSFER => true,
8
CURLOPT_ENCODING => "",
9
CURLOPT_MAXREDIRS => 10,
10
CURLOPT_TIMEOUT => 0,
11
CURLOPT_FOLLOWLOCATION => false,
12
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
13
CURLOPT_CUSTOMREQUEST => "GET",
14
CURLOPT_HTTPHEADER => array(
15
"Content-Type: application/json"
16
),
17
));
18
19
$response = curl_exec($curl);
20
$err = curl_error($curl);
21
22
curl_close($curl);
23
24
if ($err) {
25
echo "cURL Error #:" . $err;
26
} else {
27
echo $response;
28
} ?>
Copied!
Subscribe to Events in REST interface is using WebSocket for streaming (Push) events to the consumer. You will need to implement a WebSocket receiver accordingly.
The following Ruby code snippet is using KubeMQ's REST interface:
1
require "uri"
2
require "net/http"
3
4
url = URI("http://localhost:9090/subscribe/events?client_id=some_client_id&channel=hello-world&group=some_group&subscribe_type=events")
5
6
http = Net::HTTP.new(url.host, url.port)
7
8
request = Net::HTTP::Get.new(url)
9
request["Content-Type"] = "application/json"
10
11
response = http.request(request)
12
puts response.read_body
Copied!
Subscribe to Events in REST interface is using WebSocket for streaming (Push) events to the consumer. You will need to implement a WebSocket receiver accordingly.
The following jQuery code snippet is using KubeMQ's REST interface:
1
var settings = {
2
"url": "http://localhost:9090/subscribe/events?client_id=some_client_id&channel=hello-world&group=some_group&subscribe_type=events",
3
"method": "GET",
4
"timeout": 0,
5
"headers": {
6
"Content-Type": "application/json",
7
},
8
};
9
10
$.ajax(settings).done(function (response) {
11
console.log(response);
12
});
Copied!
Subscribe to Events in REST interface is using WebSocket for streaming (Push) events to the consumer. You will need to implement a WebSocket receiver accordingly.

Publish to Event Channel

After you have subscribed to a hello-world channel, you can send your message to it.
kubemqctl
curl
.Net
Java
Go
Python
Node
PHP
Ruby
jquery
Run the following kubemqctl command:
1
kubemqctl events send hello-world "Hi KubeMQ"
Copied!
The following cURL command is using KubeMQ's REST interface:
1
curl --location --request POST "http://localhost:9090/send/event"
2
--header "Content-Type: application/json"
3
--data '{"EventID": "1234-5678-90","ClientID": "events-client-id","Channel": "events-channel","Metadata": "some-metadata","Body": "c29tZSBlbmNvZGVkIGJvZHk=","Store": false}'
Copied!
A response for a successful command will look like this:
1
{
2
"is_error": false,
3
"message": "OK",
4
"data": {
5
"EventID": "1234-5678-90",
6
"Sent": true
7
}
8
}
Copied!
The following .NET code snippet is using KubeMQ's .NET SDK with gRPC interface:
1
using System;
2
3
namespace PubSub_Publish_to_a_Channel
4
{
5
class Program
6
{
7
static void Main(string[] args)
8
{
9
var ChannelName = "hello-wrold";
10
var ClientID = "hello-world-sender";
11
var KubeMQServerAddress = "localhost:50000";
12
13
14
var channel = new KubeMQ.SDK.csharp.Events.Channel(new KubeMQ.SDK.csharp.Events.ChannelParameters
15
{
16
ChannelName = ChannelName,
17
ClientID = ClientID,
18
KubeMQAddress = KubeMQServerAddress
19
});
20
21
try
22
{
23
var result = channel.SendEvent(new KubeMQ.SDK.csharp.Events.Event()
24
{
25
Body = KubeMQ.SDK.csharp.Tools.Converter.ToByteArray("hello kubemq - sending single event")
26
});
27
if (!result.Sent)
28
{
29
Console.WriteLine(quot;Could not send single message:{result.Error}");
30
}
31
}
32
catch (Exception ex)
33
{
34
Console.WriteLine(ex.Message);
35
}
36
}
37
}
38
}
Copied!
The following Java code snippet is using KubeMQ's Java SDK with gRPC interface:
1
package io.kubemq.sdk.examples.get_Started.pubSub_Publish_to_a_Channel;
2
3
import java.io.IOException;
4
5
import javax.net.ssl.SSLException;
6
7
import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
8
import io.kubemq.sdk.event.Event;
9
import io.kubemq.sdk.event.Result;
10
import io.kubemq.sdk.tools.Converter;
11
12
public class Program {
13
14
public static void main(String[] args) {
15
16
String channelName = "hello-world", clientID = "hello-world-subscriber", kubeMQAddress = "localhost:50000";
17
18
io.kubemq.sdk.event.Channel chan = new io.kubemq.sdk.event.Channel(channelName, clientID, false, kubeMQAddress);
19
20
Event event = new Event();
21
try {
22
event.setBody(Converter.ToByteArray("hello kubemq - sending single event"));
23
} catch (IOException e) {
24
25
e.printStackTrace();
26
}
27
28
try {
29
Result res = chan.SendEvent(event);
30
} catch (SSLException | ServerAddressNotSuppliedException e) {
31
32
e.printStackTrace();
33
}
34
35
}
36
}
Copied!
The following Go code snippet is using KubeMQ's Go SDK with gRPC interface:
1
package main
2
import (
3
"context"
4
"github.com/kubemq-io/kubemq-go"
5
"log"
6
)
7
8
func main() {
9
ctx, cancel := context.WithCancel(context.Background())
10
defer cancel()
11
client, err := kubemq.NewClient(ctx,
12
kubemq.WithAddress("localhost", 50000),
13
kubemq.WithClientId("hello-world-sender"),
14
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
15
if err != nil {
16
log.Fatal(err)
17
}
18
defer client.Close()
19
channelName := "hello-world"
20
err = client.E().
21
SetId("some-id").
22
SetChannel(channelName).
23
SetMetadata("some-metadata").
24
SetBody([]byte("hello kubemq - sending single event")).
25
Send(ctx)
26
if err != nil {
27
log.Fatal(err)
28
}
29
30
}
Copied!
The following Python code snippet is using KubeMQ's Python SDK with gRPC interface:
1
import datetime
2
3
from kubemq.events.lowlevel.event import Event
4
from kubemq.events.lowlevel.sender import Sender
5
6
if __name__ == "__main__":
7
8
publisher = Sender("localhost:50000")
9
event = Event(
10
metadata="EventMetaData",
11
body =("hello kubemq - sending single event").encode('UTF-8'),
12
store=False,
13
channel="testing_event_channel",
14
client_id="hello-world-subscriber"
15
)
16
try:
17
res = publisher.send_event(event)
18
print(res)
19
except Exception as err:
20
print(
21
"'error sending:'%s'" % (
22
err
23
)
24
)
Copied!
The following JS code snippet is using KubeMQ's NodeJS SDK with gRPC interface:
1
const kubemq = require('kubemq-nodejs');
2
let channelName = 'pubsub', clientID = 'hello-world-subscriber',
3
kubeMQHost = 'localhost', kubeMQGrpcPort = '50000';
4
const publisher = new kubemq.Publisher(kubeMQHost, kubeMQGrpcPort, clientID, channelName);
5
let event = new kubemq.Publisher.Event(kubemq.stringToByte('hello kubemq - sending single event'));
6
publisher.send(event).then(
7
res => {
8
console.log(res);
9
}).catch(
10
err => {
11
console.log('error sending' + err)
12
});
Copied!
The following PHP code snippet is using KubeMQ's REST interface:
1
<?php
2
3
$curl = curl_init();
4
5
curl_setopt_array($curl, array(
6
CURLOPT_URL => "http://localhost:9090/send/event",
7
CURLOPT_RETURNTRANSFER => true,
8
CURLOPT_ENCODING => "",
9
CURLOPT_MAXREDIRS => 10,
10
CURLOPT_TIMEOUT => 0,
11
CURLOPT_FOLLOWLOCATION => false,
12
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
13
CURLOPT_CUSTOMREQUEST => "POST",
14
CURLOPT_POSTFIELDS =>"{\n \"EventID\": \"1234-5678-90\",\n \"ClientID\": \"events-client-id\",\n \"Channel\": \"hello-world\",\n \"Metadata\": \"some-metadata\",\n \"Body\": \"c29tZSBlbmNvZGVkIGJvZHk=\",\n \"Store\": false\n}",
15
CURLOPT_HTTPHEADER => array(
16
"Content-Type: application/json"
17
),
18
));
19
20
$response = curl_exec($curl);
21
$err = curl_error($curl);
22
23
curl_close($curl);
24
25
if ($err) {
26
echo "cURL Error #:" . $err;
27
} else {
28
echo $response;
29
} ?>
Copied!
A response for a successful command will look like this:
1
{
2
"is_error": false,
3
"message": "OK",
4
"data": {
5
"EventID": "1234-5678-90",
6
"Sent": true
7
}
8
}
Copied!
The following Ruby code snippet is using KubeMQ's REST interface:
1
require "uri"
2
require "net/http"
3
4
url = URI("http://localhost:9090/send/event")
5
6
https = Net::HTTP.new(url.host, url.port)
7
https.use_ssl = true
8
9
request = Net::HTTP::Post.new(url)
10
request["Content-Type"] = "application/json"
11
request.body = "{\n \"EventID\": \"1234-5678-90\",\n \"ClientID\": \"events-client-id\",\n \"Channel\": \"hello-world\",\n \"Metadata\": \"some-metadata\",\n \"Body\": \"c29tZSBlbmNvZGVkIGJvZHk=\",\n \"Store\": false\n}"
12
response = https.request(request)
13
puts response.read_body
Copied!
A response for a successful command will look like this:
1
{
2
"is_error": false,
3
"message": "OK",
4
"data": {
5
"EventID": "1234-5678-90",
6
"Sent": true
7
}
8
}
Copied!
The following jQuery code snippet is using KubeMQ's REST interface:
1
var settings = {
2
"url": "http://localhost:9090/send/event",
3
"method": "POST",
4
"timeout": 0,
5
"headers": {
6
"Content-Type": "application/json",
7
},
8