RPC

Get Started with RPC in KubeMQ

Connect Your KubeMQ Cluster

To be able to communicate with KubeMQ interface ports running in Kubernetes cluster, a Port Forward of KubeMQ's ports is needed.

kubemqctl has a handy command that will do it for you:

kubemqctl set cluster proxy

Subscribe to Commands Channel

A receiver can subscribe to the hello-command channel with one of the following methods.

kubemqctl
curl
.Net
Java
Go
Python
Node
PHP
Ruby
jquery
kubemqctl

Run the following kubemqctl command:

kubemqctl commands rec "hello-command" -a

When connected, the stream will block until receiving a command. Once a command will be received, kubemqctl automatically will send a Response.

curl

The following cURL command is using KubeMQ's REST interface:

curl --location --request GET "http://localhost:9090/subscribe/requests?client_id=some_client_id&channel=hello-command&subscribe_type=commands" \
--header "Content-Type: application/json" \
--data ""

Once a command is received a Send Response call should be invoked:

curl --location --request POST "http://localhost:9090/send/response"
--header "Content-Type: application/json"
--data '{"RequestID": "<put here request id from command request>","ClientID":"some_client_id","ReplyChannel": "put here the reply channel value from command request","Metadata" :"some_metadata", "Body": "c29tZSBlbmNvZGVkIGJvZHk=","Executed": true,"Error":""}'

Important - The reply channel address is automatically generated by the KubeMQ and can be found in the command request ReplyChannel field.

Subscribe to Commands in REST interface is using WebSocket for streaming (Push) commands to the receiver. You will need to implement a WebSocket receiver accordingly.

.Net

The following .NET code snippet is using KubeMQ's .NET SDK with gRPC interface:

using System;
namespace RPC_Subscribe_to_a_Channel
{
class Program
{
static void Main(string[] args)
{
var ChannelName = "hello-command";
var ClientID = "hello-world-subscriber";
var KubeMQServerAddress = "localhost:50000";
KubeMQ.SDK.csharp.CommandQuery.Responder responder = new KubeMQ.SDK.csharp.CommandQuery.Responder(KubeMQServerAddress);
try
{
responder.SubscribeToRequests(new KubeMQ.SDK.csharp.Subscription.SubscribeRequest()
{
Channel = ChannelName,
SubscribeType = KubeMQ.SDK.csharp.Subscription.SubscribeType.Commands,
ClientID = ClientID
}, (commandReceive) => {
Console.WriteLine($"Command Received: Id:{commandReceive.RequestID} Channel:{commandReceive.Channel} Metadata:{commandReceive.Metadata} Body:{ KubeMQ.SDK.csharp.Tools.Converter.FromByteArray(commandReceive.Body)} ");
return new KubeMQ.SDK.csharp.CommandQuery.Response(commandReceive)
{
Body = new byte[0],
CacheHit = false,
Error = "None",
ClientID = ClientID,
Executed = true,
Metadata = string.Empty,
Timestamp = DateTime.UtcNow,
};
}, (errorHandler) =>
{
Console.WriteLine(errorHandler.Message);
});
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
Console.WriteLine("press any key to close RPC_Subscribe_to_a_Channel");
Console.ReadLine();
}
}
}

When executed, a stream of events messages will be shown in the console.

Java

The following Java code snippet is using KubeMQ's Java SDK with gRPC interface:

package io.kubemq.sdk.examples.get_Started.rPC_Subscribe_to_a_Channel;
import java.io.IOException;
import java.time.LocalDateTime;
import javax.net.ssl.SSLException;
import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
import io.kubemq.sdk.commandquery.Responder;
import io.kubemq.sdk.commandquery.Response;
import io.kubemq.sdk.grpc.Kubemq.PingResult;
import io.kubemq.sdk.subscription.SubscribeRequest;
import io.kubemq.sdk.subscription.SubscribeType;
public class Program {
public static void main(String[] args) throws IOException {
String ChannelName = "hello-command", ClientID = "hello-world-sender",
KubeMQServerAddress = "localhost:50000";
Responder.RequestResponseObserver HandleIncomingRequests;
Responder responder = new Responder(KubeMQServerAddress);
HandleIncomingRequests = request -> {
Response response = new Response(request);
response.setCacheHit(false);
response.setError("None");
response.setClientID(ClientID);
response.setBody("OK".getBytes());
response.setExecuted(true);
response.setMetadata("OK");
response.setTimestamp(LocalDateTime.now());
return response;
};
SubscribeRequest subscribeRequest = new SubscribeRequest();
subscribeRequest.setChannel(ChannelName);
subscribeRequest.setClientID(ClientID);
subscribeRequest.setSubscribeType(SubscribeType.Commands);
new Thread() {
public void run() {
try {
responder.SubscribeToRequests(subscribeRequest, HandleIncomingRequests);
} catch (SSLException e) {
System.out.printf("SSLException:%s", e.getMessage());
e.printStackTrace();
} catch (ServerAddressNotSuppliedException e) {
System.out.printf("ServerAddressNotSuppliedException:%s", e.getMessage());
e.printStackTrace();
}
}
}.start();
}
}

When executed, a stream of events messages will be shown in the console.

Go

The following Go code snippet is using KubeMQ's Go SDK with gRPC interface:

package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client, err := kubemq.NewClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("test-command-client-id"),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)
}
defer client.Close()
channel := "hello-command"
errCh := make(chan error)
commandsCh, err := client.SubscribeToCommands(ctx, channel, "", errCh)
if err != nil {
log.Fatal(err)
}
for {
select {
case err := <-errCh:
log.Fatal(err)
return
case command, more := <-commandsCh:
if !more {
log.Println("Command Received , done")
return
}
log.Printf("Command Received:\nId %s\nChannel: %s\nMetadata: %s\nBody: %s\n", command.Id, command.Channel, command.Metadata, command.Body)
err := client.R().
SetRequestId(command.Id).
SetResponseTo(command.ResponseTo).
SetExecutedAt(time.Now()).
Send(ctx)
if err != nil {
log.Fatal(err)
}
case <-ctx.Done():
return
}
}
}

When connected, once a command will be received in the channel, we create a Response and send back to the sender.

Python

The following Python code snippet is using KubeMQ's Python SDK with gRPC interface:

import datetime
from builtins import input
from random import randint
from kubemq.commandquery.responder import Responder
from kubemq.commandquery.response import Response
from kubemq.subscription.events_store_type import EventsStoreType
from kubemq.subscription.subscribe_request import SubscribeRequest
from kubemq.subscription.subscribe_type import SubscribeType
from kubemq.tools.listener_cancellation_token import ListenerCancellationToken
def handle_incoming_request(request):
if request:
print("Subscriber Received request: Metadata:'%s', Channel:'%s', Body:'%s' tags:%s" % (
request.metadata,
request.channel,
request.body,
request.tags
))
response = Response(request)
response.body = "OK".encode('UTF-8')
response.cache_hit = False
response.error = "None"
response.client_id = 'hello-world-sender'
response.executed = True
response.metadata = "OK"
response.timestamp = datetime.datetime.now()
response.tags=request.tags
return response
def handle_incoming_error(error_msg):
print("received error:%s'" % (
error_msg
))
if __name__ == "__main__":
cancel_token=ListenerCancellationToken()
receiver = Responder("localhost:50000")
subscribe_request = SubscribeRequest(
channel="testing_Command_channel",
client_id='hello-world-sender',
events_store_type=SubscribeType.SubscribeTypeUndefined,
events_store_type_value=0,
group="",
subscribe_type=SubscribeType.Commands
)
receiver.subscribe_to_requests(subscribe_request, handle_incoming_request,handle_incoming_error,cancel_token)
input("Press 'Enter' to stop Listen...\n")
cancel_token.cancel()
input("Press 'Enter' to stop the application...\n")

When executed, a stream of events messages will be shown in the console.

Node

The following JS code snippet is using KubeMQ's NodeJS SDK with gRPC interface:

const kubemq = require('kubemq-nodejs');
let channelName = 'testing_Command_channel', clientID = 'hello-world-sender',
kubeMQHost = 'localhost', kubeMQGrpcPort = '50000';
let receiver = new kubemq.CommandReceiver(kubeMQHost, kubeMQGrpcPort, clientID, channelName);
receiver.subscribe(cmd => {
let response = new kubemq.CommandReceiver.Response(cmd, true);
response.Timestamp = Math.floor(new Date() / 1000);
receiver.sendResponse(response).then(snd => {
console.log('sent:' + snd);
}).catch(cht => console.log(cht));
}, err => {
console.log(err);
})
PHP

The following PHP code snippet is using KubeMQ's REST interface:

<?php
$curl = curl_init();
curl_setopt_array($curl, array(
CURLOPT_URL => "http://localhost:9090/subscribe/requests?client_id=some_client_id&channel=hello-command&subscribe_type=commands",
CURLOPT_RETURNTRANSFER => true,
CURLOPT_ENCODING => "",
CURLOPT_MAXREDIRS => 10,
CURLOPT_TIMEOUT => 0,
CURLOPT_FOLLOWLOCATION => false,
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
CURLOPT_CUSTOMREQUEST => "GET",
CURLOPT_HTTPHEADER => array(
"Content-Type: application/json"
),
));
$response = curl_exec($curl);
$err = curl_error($curl);
curl_close($curl);
if ($err) {
echo "cURL Error #:" . $err;
} else {
echo $response;
} ?>

Once a command is received a Send Response call should be invoked:

<?php
$curl = curl_init();
curl_setopt_array($curl, array(
CURLOPT_URL => "http://localhost:9090/send/response",
CURLOPT_RETURNTRANSFER => true,
CURLOPT_ENCODING => "",
CURLOPT_MAXREDIRS => 10,
CURLOPT_TIMEOUT => 0,
CURLOPT_FOLLOWLOCATION => false,
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
CURLOPT_CUSTOMREQUEST => "POST",
CURLOPT_POSTFIELDS =>"{\n\t\"RequestID\": \"<put here request id from command request>\",\n\t\"ClientID\":\"some_client_id\",\n\t\"ReplyChannel\": \"<put here the reply channel value from command request>\",\n\t\"Metadata\" :\"some_metadata2\",\n\t\"Body\": \"c29tZSBlbmNvZGVkIGJvZHk=\",\n\t\"Executed\": true,\n\t\"Error\":\"\"\n}",
CURLOPT_HTTPHEADER => array(
"Content-Type: application/json"
),
));
$response = curl_exec($curl);
$err = curl_error($curl);
curl_close($curl);
if ($err) {
echo "cURL Error #:" . $err;
} else {
echo $response;
} ?>

Important - The reply channel address is automatically generated by the KubeMQ and can be found in the command request ReplyChannel field.

Subscribe to Commands in REST interface is using WebSocket for streaming (Push) commands to the receiver. You will need to implement a WebSocket receiver accordingly.

Ruby

The following Ruby code snippet is using KubeMQ's REST interface:

require "uri"
require "net/http"
url = URI("http://localhost:9090/subscribe/requests?client_id=some_client_id&channel=hello-command&subscribe_type=commands")
http = Net::HTTP.new(url.host, url.port)
request = Net::HTTP::Get.new(url)
request["Content-Type"] = "application/json"
response = http.request(request)
puts response.read_body

Once a command is received a Send Response call should be invoked:

require "uri"
require "net/http"
url = URI("http://localhost:9090/send/response")
http = Net::HTTP.new(url.host, url.port)
request = Net::HTTP::Post.new(url)
request["Content-Type"] = "application/json"
request.body = "{\n\t\"RequestID\": \"<put here request id from command request>\",\n\t\"ClientID\":\"some_client_id\",\n\t\"ReplyChannel\": \"<put here the reply channel value from command request>q\",\n\t\"Metadata\" :\"some_metadata2\",\n\t\"Body\": \"c29tZSBlbmNvZGVkIGJvZHk=\",\n\t\"Executed\": true,\n\t\"Error\":\"\"\n}"
response = http.request(request)
puts response.read_body

Important - The reply channel address is automatically generated by the KubeMQ and can be found in the command request ReplyChannel field.

Subscribe to Commands in REST interface is using WebSocket for streaming (Push) commands to the receiver. You will need to implement a WebSocket receiver accordingly.

jquery

The following jQuery code snippet is using KubeMQ's REST interface:

var settings = {
"url": "http://localhost:9090/subscribe/requests?client_id=some_client_id&channel=hello-command&subscribe_type=commands",
"method": "GET",
"timeout": 0,
"headers": {
"Content-Type": "application/json"
},
};
$.ajax(settings).done(function (response) {
console.log(response);
});

Once a command is received a Send Response call should be invoked:

var settings = {
"url": "http://localhost:9090/send/response",
"method": "POST",
"timeout": 0,
"headers": {
"Content-Type": "application/json"
},
"data": "{\n\t\"RequestID\": \"<put here request id from command request>\",\n\t\"ClientID\":\"some_client_id\",\n\t\"ReplyChannel\": \"<put here the reply channel value from command request>\",\n\t\"Metadata\" :\"some_metadata2\",\n\t\"Body\": \"c29tZSBlbmNvZGVkIGJvZHk=\",\n\t\"Executed\": true,\n\t\"Error\":\"\"\n}",
};
$.ajax(settings).done(function (response) {
console.log(response);
});

Important - the reply channel address is automatically generated by the KubeMQ and can be found in the command request ReplyChannel field.

Subscribe to Commands in REST interface is using WebSocket for streaming (Push) commands to the receiver. You will need to implement a WebSocket receiver accordingly.

Send to Commands Channel

After you have subscribed to a hello-command channel, you can send your command to it.

kubemqctl
curl
.Net
Java
Go
Python
Node
PHP
Ruby
jquery
kubemqctl

Run the following kubemqctl command:

kubemqctl commands send "hello-command" "some command"
curl

The following cURL command is using KubeMQ's REST interface:

curl --location --request POST "http://localhost:9090/send/request"
--header "Content-Type: application/json"
--data '{"RequestID": "688daec3-7f3e-4766-87fa-4cd1f4f03a23","RequestTypeData":1, "ClientID": "some_clientID","Channel": "hello-command","Metadata" :"some_metadata","Body": "c29tZSBlbmNvZGVkIGJvZHk=","Timeout": 10000}'
.Net

The following .NET code snippet is using KubeMQ's .NET SDK with gRPC interface:

using System;
namespace RPC_Send_a_Command_Channel
{
class Program
{
static void Main(string[] args)
{
var ChannelName = "hello-command";
var ClientID = "hello-world-sender";
var KubeMQServerAddress = "localhost:50000";
var channel = new KubeMQ.SDK.csharp.CommandQuery.Channel(new KubeMQ.SDK.csharp.CommandQuery.ChannelParameters
{
RequestsType = KubeMQ.SDK.csharp.CommandQuery.RequestType.Command,
Timeout = 10000,
ChannelName = ChannelName,
ClientID = ClientID,
KubeMQAddress = KubeMQServerAddress
});
try
{
var result = channel.SendRequest(new KubeMQ.SDK.csharp.CommandQuery.Request
{
Body = KubeMQ.SDK.csharp.Tools.Converter.ToByteArray("hello kubemq - sending a command, please reply")
});
if (!result.Executed)
{
Console.WriteLine($"Response error:{result.Error}");
return;
}
Console.WriteLine($"Response Received:{result.RequestID} ExecutedAt:{result.Timestamp}");
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
Java

The following Java code snippet is using KubeMQ's Java SDK with gRPC interface:

package io.kubemq.sdk.examples.get_Started.rPC_Send_a_Command_Channel;
import java.io.IOException;
import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
import io.kubemq.sdk.commandquery.ChannelParameters;
import io.kubemq.sdk.commandquery.Request;
import io.kubemq.sdk.commandquery.RequestType;
import io.kubemq.sdk.commandquery.Response;
import io.kubemq.sdk.tools.Converter;
public class Program {
public static void main(String[] args) throws IOException {
String ChannelName = "hello-command", ClientID = "hello-world-sender",
KubeMQServerAddress = "localhost:50000";
ChannelParameters channelParameters = new ChannelParameters();
channelParameters.setChannelName(ChannelName);
channelParameters.setClientID(ClientID);
channelParameters.setKubeMQAddress(KubeMQServerAddress);
channelParameters.setRequestType(RequestType.Command);
channelParameters.setTimeout(10000);
io.kubemq.sdk.commandquery.Channel channel = new io.kubemq.sdk.commandquery.Channel(channelParameters);
Request request = new Request();
request.setBody(Converter.ToByteArray("hello kubemq - sending a command, please reply"));
Response result;
try {
result = channel.SendRequest(request);
if (!result.isExecuted()) {
System.out.printf("Response error: %s", result.getError());
return;
}
System.out.printf("Response Received: %s, ExecutedAt: %s", result.getRequestID(), result.getTimestamp().toString());
} catch (ServerAddressNotSuppliedException e) {
System.out.printf("ServerAddressNotSuppliedException: %s", e.toString());
e.printStackTrace();
}
}
}
Go

The following Go code snippet is using KubeMQ's Go SDK with gRPC interface:

package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client, err := kubemq.NewClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("test-command-client-id"),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)
}
defer client.Close()
channel := "hello-command"
response, err := client.C().
SetId("some-command-id").
SetChannel(channel).
SetMetadata("some-metadata").
SetBody([]byte("hello kubemq - sending a command, please reply")).
SetTimeout(10 *time.Second).
Send(ctx)
if err != nil {
log.Fatal(err)
}
log.Printf("Response Received:\nCommandID: %s\nExecutedAt:%s\n", response.CommandId, response.ExecutedAt)
}
Python

The following Python code snippet is using KubeMQ's Python SDK with gRPC interface:

from kubemq.commandquery.lowlevel.initiator import Initiator
from kubemq.commandquery.lowlevel.request import Request
from kubemq.commandquery.request_type import RequestType
if __name__ == "__main__":
initiator = Initiator("localhost:50000")
request = Request(
body="hello kubemq - sending a command, please reply'".encode('UTF-8'),
metadata="",
cache_key="",
cache_ttl=0,
channel="testing_Command_channel",
client_id="hello-world-sender",
timeout=10000,
request_type=RequestType.Command,
)
try:
response = initiator.send_request(request)
print('Response Received:%s Executed at::%s' % (
response.request_id,
response.timestamp
))
except Exception as err :
print('command error::%s' % (
err
))
Node

The following JS code snippet is using KubeMQ's NodeJS SDK with gRPC interface:

const kubemq = require('kubemq-nodejs');
let kubeMQHost = 'localhost', kubeMQGrpcPort = '50000',
channelName = 'testing_Command_channel', clientID = 'hello-world-sender',
defaultTimeOut = 10000;
let sender = new kubemq.CommandSender(kubeMQHost, kubeMQGrpcPort, clientID, channelName, defaultTimeOut);
let request = new kubemq.CommandSender.CommandRequest(
kubemq.stringToByte(' hello kubemq - sending a command, please reply'));
sender.send(request).then(
res => {
if (res.Error) {
console.log('Response error: ' + res.message);
return;
}
console.log('Response Received:' + res.RequestID + ' ExecutedAt:' + res.Timestamp);
}).catch(
err => {
console.log('command error: ' + err)
});
PHP

The following PHP code snippet is using KubeMQ's REST interface:

<?php
$curl = curl_init();
curl_setopt_array($curl, array(
CURLOPT_URL => "http://localhost:9090/send/request",
CURLOPT_RETURNTRANSFER => true,
CURLOPT_ENCODING => "",
CURLOPT_MAXREDIRS => 10,
CURLOPT_TIMEOUT => 0,
CURLOPT_FOLLOWLOCATION => false,
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
CURLOPT_CUSTOMREQUEST => "POST",
CURLOPT_POSTFIELDS =>"{\n\t\"RequestID\": \"688daec3-7f3e-4766-87fa-4cd1f4f03a23\",\n\t\"RequestTypeData\":1, \n\t\"ClientID\": \"some_clientID\",\n\t\"Channel\": \"hello-command\",\n\t\"Metadata\" :\"some_metadata\",\n\t\"Body\": \"c29tZSBlbmNvZGVkIGJvZHk=\",\n\t\"Timeout\": 10000\n}",
CURLOPT_HTTPHEADER => array(
"Content-Type: application/json"
),
));
$response = curl_exec($curl);
$err = curl_error($curl);
curl_close($curl);
if ($err) {
echo "cURL Error #:" . $err;
} else {
echo $response;
} ?>
Ruby

The following Ruby code snippet is using KubeMQ's REST interface:

require "uri"
require "net/http"
url = URI("http://localhost:9090/send/request")
http = Net::HTTP.new(url.host, url.port)
request = Net::HTTP::Post.new(url)
request["Content-Type"] = "application/json"
request.body = "{\n\t\"RequestID\": \"688daec3-7f3e-4766-87fa-4cd1f4f03a23\",\n\t\"RequestTypeData\":1, \n\t\"ClientID\": \"some_clientID\",\n\t\"Channel\": \"hello-command\",\n\t\"Metadata\" :\"some_metadata2\",\n\t\"Body\": \"c29tZSBlbmNvZGVkIGJvZHk=\",\n\t\"Timeout\": 10000\n}"
response = http.request(request)
puts response.read_body
jquery

The following jQuery code snippet is using KubeMQ's REST interface:

var settings = {
"url": "http://localhost:9090/send/request",
"method": "POST",
"timeout": 0,
"headers": {
"Content-Type": "application/json",
},
"data": "{\n\t\"RequestID\": \"688daec3-7f3e-4766-87fa-4cd1f4f03a23\",\n\t\"RequestTypeData\":1, \n\t\"ClientID\": \"some_clientID\",\n\t\"Channel\": \"hello-command\",\n\t\"Metadata\" :\"some_metadata2\",\n\t\"Body\": \"c29tZSBlbmNvZGVkIGJvZHk=\",\n\t\"Timeout\": 10000\n}",
};
$.ajax(settings).done(function (response) {
console.log(response);
});

Demo