RPC
Get Started with RPC in KubeMQ

Connect Your KubeMQ Cluster

To be able to communicate with KubeMQ interface ports running in Kubernetes cluster, a Port Forward of KubeMQ's ports is needed.
kubemqctl has a handy command that will do it for you:
1
kubemqctl set cluster proxy
Copied!

Subscribe to Commands Channel

A receiver can subscribe to the hello-command channel with one of the following methods.
kubemqctl
curl
.Net
Java
Go
Python
Node
PHP
Ruby
jquery
Run the following kubemqctl command:
1
kubemqctl commands rec "hello-command" -a
Copied!
When connected, the stream will block until receiving a command. Once a command will be received, kubemqctl automatically will send a Response.
The following cURL command is using KubeMQ's REST interface:
1
curl --location --request GET "http://localhost:9090/subscribe/requests?client_id=some_client_id&channel=hello-command&subscribe_type=commands" \
2
--header "Content-Type: application/json" \
3
--data ""
Copied!
Once a command is received a Send Response call should be invoked:
1
curl --location --request POST "http://localhost:9090/send/response"
2
--header "Content-Type: application/json"
3
--data '{"RequestID": "<put here request id from command request>","ClientID":"some_client_id","ReplyChannel": "put here the reply channel value from command request","Metadata" :"some_metadata", "Body": "c29tZSBlbmNvZGVkIGJvZHk=","Executed": true,"Error":""}'
Copied!
Important - The reply channel address is automatically generated by the KubeMQ and can be found in the command request ReplyChannel field.
Subscribe to Commands in REST interface is using WebSocket for streaming (Push) commands to the receiver. You will need to implement a WebSocket receiver accordingly.
The following .NET code snippet is using KubeMQ's .NET SDK with gRPC interface:
1
using System;
2
3
namespace RPC_Subscribe_to_a_Channel
4
{
5
class Program
6
{
7
static void Main(string[] args)
8
{
9
10
var ChannelName = "hello-command";
11
var ClientID = "hello-world-subscriber";
12
var KubeMQServerAddress = "localhost:50000";
13
14
15
16
KubeMQ.SDK.csharp.CommandQuery.Responder responder = new KubeMQ.SDK.csharp.CommandQuery.Responder(KubeMQServerAddress);
17
try
18
{
19
responder.SubscribeToRequests(new KubeMQ.SDK.csharp.Subscription.SubscribeRequest()
20
{
21
Channel = ChannelName,
22
SubscribeType = KubeMQ.SDK.csharp.Subscription.SubscribeType.Commands,
23
ClientID = ClientID
24
}, (commandReceive) => {
25
Console.WriteLine(quot;Command Received: Id:{commandReceive.RequestID} Channel:{commandReceive.Channel} Metadata:{commandReceive.Metadata} Body:{ KubeMQ.SDK.csharp.Tools.Converter.FromByteArray(commandReceive.Body)} ");
26
return new KubeMQ.SDK.csharp.CommandQuery.Response(commandReceive)
27
{
28
Body = new byte[0],
29
CacheHit = false,
30
Error = "None",
31
ClientID = ClientID,
32
Executed = true,
33
Metadata = string.Empty,
34
Timestamp = DateTime.UtcNow,
35
};
36
37
}, (errorHandler) =>
38
{
39
Console.WriteLine(errorHandler.Message);
40
});
41
}
42
catch (Exception ex)
43
{
44
Console.WriteLine(ex.Message);
45
}
46
Console.WriteLine("press any key to close RPC_Subscribe_to_a_Channel");
47
Console.ReadLine();
48
}
49
}
50
}
Copied!
When executed, a stream of events messages will be shown in the console.
The following Java code snippet is using KubeMQ's Java SDK with gRPC interface:
1
package io.kubemq.sdk.examples.get_Started.rPC_Subscribe_to_a_Channel;
2
3
import java.io.IOException;
4
import java.time.LocalDateTime;
5
6
import javax.net.ssl.SSLException;
7
8
import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
9
import io.kubemq.sdk.commandquery.Responder;
10
import io.kubemq.sdk.commandquery.Response;
11
import io.kubemq.sdk.grpc.Kubemq.PingResult;
12
import io.kubemq.sdk.subscription.SubscribeRequest;
13
import io.kubemq.sdk.subscription.SubscribeType;
14
15
public class Program {
16
17
public static void main(String[] args) throws IOException {
18
String ChannelName = "hello-command", ClientID = "hello-world-sender",
19
KubeMQServerAddress = "localhost:50000";
20
Responder.RequestResponseObserver HandleIncomingRequests;
21
Responder responder = new Responder(KubeMQServerAddress);
22
HandleIncomingRequests = request -> {
23
24
Response response = new Response(request);
25
response.setCacheHit(false);
26
response.setError("None");
27
response.setClientID(ClientID);
28
response.setBody("OK".getBytes());
29
response.setExecuted(true);
30
response.setMetadata("OK");
31
response.setTimestamp(LocalDateTime.now());
32
return response;
33
};
34
SubscribeRequest subscribeRequest = new SubscribeRequest();
35
subscribeRequest.setChannel(ChannelName);
36
subscribeRequest.setClientID(ClientID);
37
subscribeRequest.setSubscribeType(SubscribeType.Commands);
38
39
new Thread() {
40
public void run() {
41
42
try {
43
responder.SubscribeToRequests(subscribeRequest, HandleIncomingRequests);
44
} catch (SSLException e) {
45
System.out.printf("SSLException:%s", e.getMessage());
46
e.printStackTrace();
47
} catch (ServerAddressNotSuppliedException e) {
48
System.out.printf("ServerAddressNotSuppliedException:%s", e.getMessage());
49
e.printStackTrace();
50
}
51
}
52
}.start();
53
}
54
}
Copied!
When executed, a stream of events messages will be shown in the console.
The following Go code snippet is using KubeMQ's Go SDK with gRPC interface:
1
package main
2
3
import (
4
"context"
5
"github.com/kubemq-io/kubemq-go"
6
"log"
7
"time"
8
)
9
10
func main() {
11
ctx, cancel := context.WithCancel(context.Background())
12
defer cancel()
13
client, err := kubemq.NewClient(ctx,
14
kubemq.WithAddress("localhost", 50000),
15
kubemq.WithClientId("test-command-client-id"),
16
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
17
if err != nil {
18
log.Fatal(err)
19
}
20
defer client.Close()
21
channel := "hello-command"
22
errCh := make(chan error)
23
commandsCh, err := client.SubscribeToCommands(ctx, channel, "", errCh)
24
if err != nil {
25
log.Fatal(err)
26
}
27
for {
28
select {
29
case err := <-errCh:
30
log.Fatal(err)
31
return
32
case command, more := <-commandsCh:
33
if !more {
34
log.Println("Command Received , done")
35
return
36
}
37
log.Printf("Command Received:\nId %s\nChannel: %s\nMetadata: %s\nBody: %s\n", command.Id, command.Channel, command.Metadata, command.Body)
38
err := client.R().
39
SetRequestId(command.Id).
40
SetResponseTo(command.ResponseTo).
41
SetExecutedAt(time.Now()).
42
Send(ctx)
43
if err != nil {
44
log.Fatal(err)
45
}
46
case <-ctx.Done():
47
return
48
}
49
}
50
51
}
Copied!
When connected, once a command will be received in the channel, we create a Response and send back to the sender.
The following Python code snippet is using KubeMQ's Python SDK with gRPC interface:
1
import datetime
2
from builtins import input
3
from random import randint
4
5
from kubemq.commandquery.responder import Responder
6
from kubemq.commandquery.response import Response
7
from kubemq.subscription.events_store_type import EventsStoreType
8
from kubemq.subscription.subscribe_request import SubscribeRequest
9
from kubemq.subscription.subscribe_type import SubscribeType
10
from kubemq.tools.listener_cancellation_token import ListenerCancellationToken
11
12
13
14
15
16
def handle_incoming_request(request):
17
if request:
18
print("Subscriber Received request: Metadata:'%s', Channel:'%s', Body:'%s' tags:%s" % (
19
request.metadata,
20
request.channel,
21
request.body,
22
request.tags
23
))
24
25
response = Response(request)
26
response.body = "OK".encode('UTF-8')
27
response.cache_hit = False
28
response.error = "None"
29
response.client_id = 'hello-world-sender'
30
response.executed = True
31
response.metadata = "OK"
32
response.timestamp = datetime.datetime.now()
33
response.tags=request.tags
34
return response
35
36
def handle_incoming_error(error_msg):
37
print("received error:%s'" % (
38
error_msg
39
))
40
41
42
if __name__ == "__main__":
43
cancel_token=ListenerCancellationToken()
44
receiver = Responder("localhost:50000")
45
46
subscribe_request = SubscribeRequest(
47
channel="testing_Command_channel",
48
client_id='hello-world-sender',
49
events_store_type=SubscribeType.SubscribeTypeUndefined,
50
events_store_type_value=0,
51
group="",
52
subscribe_type=SubscribeType.Commands
53
)
54
receiver.subscribe_to_requests(subscribe_request, handle_incoming_request,handle_incoming_error,cancel_token)
55
56
input("Press 'Enter' to stop Listen...\n")
57
cancel_token.cancel()
58
input("Press 'Enter' to stop the application...\n")
Copied!
When executed, a stream of events messages will be shown in the console.
The following JS code snippet is using KubeMQ's NodeJS SDK with gRPC interface:
1
const kubemq = require('kubemq-nodejs');
2
3
let channelName = 'testing_Command_channel', clientID = 'hello-world-sender',
4
kubeMQHost = 'localhost', kubeMQGrpcPort = '50000';
5
6
let receiver = new kubemq.CommandReceiver(kubeMQHost, kubeMQGrpcPort, clientID, channelName);
7
receiver.subscribe(cmd => {
8
let response = new kubemq.CommandReceiver.Response(cmd, true);
9
response.Timestamp = Math.floor(new Date() / 1000);
10
receiver.sendResponse(response).then(snd => {
11
console.log('sent:' + snd);
12
}).catch(cht => console.log(cht));
13
}, err => {
14
console.log(err);
15
})
Copied!
The following PHP code snippet is using KubeMQ's REST interface:
1
<?php
2
3
$curl = curl_init();
4
5
curl_setopt_array($curl, array(
6
CURLOPT_URL => "http://localhost:9090/subscribe/requests?client_id=some_client_id&channel=hello-command&subscribe_type=commands",
7
CURLOPT_RETURNTRANSFER => true,
8
CURLOPT_ENCODING => "",
9
CURLOPT_MAXREDIRS => 10,
10
CURLOPT_TIMEOUT => 0,
11
CURLOPT_FOLLOWLOCATION => false,
12
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
13
CURLOPT_CUSTOMREQUEST => "GET",
14
CURLOPT_HTTPHEADER => array(
15
"Content-Type: application/json"
16
),
17
));
18
19
$response = curl_exec($curl);
20
$err = curl_error($curl);
21
22
curl_close($curl);
23
24
if ($err) {
25
echo "cURL Error #:" . $err;
26
} else {
27
echo $response;
28
} ?>
Copied!
Once a command is received a Send Response call should be invoked:
1
<?php
2
3
$curl = curl_init();
4
5
curl_setopt_array($curl, array(
6
CURLOPT_URL => "http://localhost:9090/send/response",
7
CURLOPT_RETURNTRANSFER => true,
8
CURLOPT_ENCODING => "",
9
CURLOPT_MAXREDIRS => 10,
10
CURLOPT_TIMEOUT => 0,
11
CURLOPT_FOLLOWLOCATION => false,
12
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
13
CURLOPT_CUSTOMREQUEST => "POST",
14
CURLOPT_POSTFIELDS =>"{\n\t\"RequestID\": \"<put here request id from command request>\",\n\t\"ClientID\":\"some_client_id\",\n\t\"ReplyChannel\": \"<put here the reply channel value from command request>\",\n\t\"Metadata\" :\"some_metadata2\",\n\t\"Body\": \"c29tZSBlbmNvZGVkIGJvZHk=\",\n\t\"Executed\": true,\n\t\"Error\":\"\"\n}",
15
CURLOPT_HTTPHEADER => array(
16
"Content-Type: application/json"
17
),
18
));
19
20
$response = curl_exec($curl);
21
$err = curl_error($curl);
22
23
curl_close($curl);
24
25
if ($err) {
26
echo "cURL Error #:" . $err;
27
} else {
28
echo $response;
29
} ?>
Copied!
Important - The reply channel address is automatically generated by the KubeMQ and can be found in the command request ReplyChannel field.
Subscribe to Commands in REST interface is using WebSocket for streaming (Push) commands to the receiver. You will need to implement a WebSocket receiver accordingly.
The following Ruby code snippet is using KubeMQ's REST interface:
1
require "uri"
2
require "net/http"
3
4
url = URI("http://localhost:9090/subscribe/requests?client_id=some_client_id&channel=hello-command&subscribe_type=commands")
5
http = Net::HTTP.new(url.host, url.port)
6
request = Net::HTTP::Get.new(url)
7
request["Content-Type"] = "application/json"
8
response = http.request(request)
9
puts response.read_body
Copied!
Once a command is received a Send Response call should be invoked:
1
require "uri"
2
require "net/http"
3
4
url = URI("http://localhost:9090/send/response")
5
6
http = Net::HTTP.new(url.host, url.port)
7
8
request = Net::HTTP::Post.new(url)
9
request["Content-Type"] = "application/json"
10
request.body = "{\n\t\"RequestID\": \"<put here request id from command request>\",\n\t\"ClientID\":\"some_client_id\",\n\t\"ReplyChannel\": \"<put here the reply channel value from command request>q\",\n\t\"Metadata\" :\"some_metadata2\",\n\t\"Body\": \"c29tZSBlbmNvZGVkIGJvZHk=\",\n\t\"Executed\": true,\n\t\"Error\":\"\"\n}"
11
response = http.request(request)
12
puts response.read_body
Copied!
Important - The reply channel address is automatically generated by the KubeMQ and can be found in the command request ReplyChannel field.
Subscribe to Commands in REST interface is using WebSocket for streaming (Push) commands to the receiver. You will need to implement a WebSocket receiver accordingly.
The following jQuery code snippet is using KubeMQ's REST interface:
1
var settings = {
2
"url": "http://localhost:9090/subscribe/requests?client_id=some_client_id&channel=hello-command&subscribe_type=commands",
3
"method": "GET",
4
"timeout": 0,
5
"headers": {
6
"Content-Type": "application/json"
7
},
8
};
9
10
$.ajax(settings).done(function (response) {
11
console.log(response);
12
});
Copied!
Once a command is received a Send Response call should be invoked:
1
var settings = {
2
"url": "http://localhost:9090/send/response",
3
"method": "POST",
4
"timeout": 0,
5
"headers": {
6
"Content-Type": "application/json"
7
},
8
"data": "{\n\t\"RequestID\": \"<put here request id from command request>\",\n\t\"ClientID\":\"some_client_id\",\n\t\"ReplyChannel\": \"<put here the reply channel value from command request>\",\n\t\"Metadata\" :\"some_metadata2\",\n\t\"Body\": \"c29tZSBlbmNvZGVkIGJvZHk=\",\n\t\"Executed\": true,\n\t\"Error\":\"\"\n}",
9
};
10
11
$.ajax(settings).done(function (response) {
12
console.log(response);
13
});
Copied!
Important - the reply channel address is automatically generated by the KubeMQ and can be found in the command request ReplyChannel field.
Subscribe to Commands in REST interface is using WebSocket for streaming (Push) commands to the receiver. You will need to implement a WebSocket receiver accordingly.

Send to Commands Channel

After you have subscribed to a hello-command channel, you can send your command to it.
kubemqctl
curl
.Net
Java
Go
Python
Node
PHP
Ruby
jquery
Run the following kubemqctl command:
1
kubemqctl commands send "hello-command" "some command"
Copied!
The following cURL command is using KubeMQ's REST interface:
1
curl --location --request POST "http://localhost:9090/send/request"
2
--header "Content-Type: application/json"
3
--data '{"RequestID": "688daec3-7f3e-4766-87fa-4cd1f4f03a23","RequestTypeData":1, "ClientID": "some_clientID","Channel": "hello-command","Metadata" :"some_metadata","Body": "c29tZSBlbmNvZGVkIGJvZHk=","Timeout": 10000}'
Copied!
The following .NET code snippet is using KubeMQ's .NET SDK with gRPC interface:
1
using System;
2
3
namespace RPC_Send_a_Command_Channel
4
{
5
class Program
6
{
7
static void Main(string[] args)
8
{
9
var ChannelName = "hello-command";
10
var ClientID = "hello-world-sender";
11
var KubeMQServerAddress = "localhost:50000";
12
13
var channel = new KubeMQ.SDK.csharp.CommandQuery.Channel(new KubeMQ.SDK.csharp.CommandQuery.ChannelParameters
14
{
15
RequestsType = KubeMQ.SDK.csharp.CommandQuery.RequestType.Command,
16
Timeout = 10000,
17
ChannelName = ChannelName,
18
ClientID = ClientID,
19
KubeMQAddress = KubeMQServerAddress
20
});
21
try
22
{
23
24
var result = channel.SendRequest(new KubeMQ.SDK.csharp.CommandQuery.Request
25
{
26
Body = KubeMQ.SDK.csharp.Tools.Converter.ToByteArray("hello kubemq - sending a command, please reply")
27
});
28
29
if (!result.Executed)
30
{
31
Console.WriteLine(quot;Response error:{result.Error}");
32
return;
33
}
34
Console.WriteLine(quot;Response Received:{result.RequestID} ExecutedAt:{result.Timestamp}");
35
}
36
catch (Exception ex)
37
{
38
Console.WriteLine(ex.Message);
39
}
40
}
41
}
42
}
Copied!
The following Java code snippet is using KubeMQ's Java SDK with gRPC interface:
1
package io.kubemq.sdk.examples.get_Started.rPC_Send_a_Command_Channel;
2
3
import java.io.IOException;
4
5
import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
6
import io.kubemq.sdk.commandquery.ChannelParameters;
7
import io.kubemq.sdk.commandquery.Request;
8
import io.kubemq.sdk.commandquery.RequestType;
9
import io.kubemq.sdk.commandquery.Response;
10
import io.kubemq.sdk.tools.Converter;
11
12
public class Program {
13
14
public static void main(String[] args) throws IOException {
15
16
String ChannelName = "hello-command", ClientID = "hello-world-sender",
17
KubeMQServerAddress = "localhost:50000";
18
ChannelParameters channelParameters = new ChannelParameters();
19
channelParameters.setChannelName(ChannelName);
20
channelParameters.setClientID(ClientID);
21
channelParameters.setKubeMQAddress(KubeMQServerAddress);
22
channelParameters.setRequestType(RequestType.Command);
23
channelParameters.setTimeout(10000);
24
io.kubemq.sdk.commandquery.Channel channel = new io.kubemq.sdk.commandquery.Channel(channelParameters);
25
Request request = new Request();
26
request.setBody(Converter.ToByteArray("hello kubemq - sending a command, please reply"));
27
Response result;
28
try {
29
result = channel.SendRequest(request);
30
if (!result.isExecuted()) {
31
System.out.printf("Response error: %s", result.getError());
32
return;
33
}
34
System.out.printf("Response Received: %s, ExecutedAt: %s", result.getRequestID(), result.getTimestamp().toString());
35
} catch (ServerAddressNotSuppliedException e) {
36
System.out.printf("ServerAddressNotSuppliedException: %s", e.toString());
37
e.printStackTrace();
38
}
39
40
41
}
42
}
Copied!
The following Go code snippet is using KubeMQ's Go SDK with gRPC interface:
1
package main
2
3
import (
4
"context"
5
"github.com/kubemq-io/kubemq-go"
6
"log"
7
"time"
8
)
9
10
func main() {
11
ctx, cancel := context.WithCancel(context.Background())
12
defer cancel()
13
client, err := kubemq.NewClient(ctx,
14
kubemq.WithAddress("localhost", 50000),
15
kubemq.WithClientId("test-command-client-id"),
16
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
17
if err != nil {
18
log.Fatal(err)
19
}
20
defer client.Close()
21
channel := "hello-command"
22
response, err := client.C().
23
SetId("some-command-id").
24
SetChannel(channel).
25
SetMetadata("some-metadata").
26
SetBody([]byte("hello kubemq - sending a command, please reply")).
27
SetTimeout(10 *time.Second).
28
Send(ctx)
29
if err != nil {
30
log.Fatal(err)
31
}
32
log.Printf("Response Received:\nCommandID: %s\nExecutedAt:%s\n", response.CommandId, response.ExecutedAt)
33
}
Copied!
The following Python code snippet is using KubeMQ's Python SDK with gRPC interface: