Queues
Get Started with Queues in KubeMQ

Connect Your KubeMQ Cluster

To be able to communicate with KubeMQ interface ports running in Kubernetes cluster, a Port Forward of KubeMQ's ports is needed.
kubemqctl has a handy command that will do it for you:
1
kubemqctl set cluster proxy
Copied!

Send a Queue Message

The producer can send a message to the "hello-world-queue" channel with one of the following methods.
kubemqctl
curl
.Net
Java
Go
Python
Node
PHP
Ruby
jquery
Run the following kubemqctl command:
1
kubemqctl queues send "hello-world-queue" "this is a queue message"
Copied!
A result message will be shown with an indication of the sending time of the message.
The following cURL command is using KubeMQ's REST interface:
1
curl -H 'Content-Type: application/json' \
2
--request POST "http://localhost:9090/queue/send" \
3
--data '{"Id":"","ClientId":"send-message-client-id","Channel":"hello-world-queue","Metadata":"","Body":"QmF0Y2ggTWVzc2FnZSAw","Tags":{"message":"0"}}'
Copied!
The following .NET code snippet is using KubeMQ's .NET SDK with gRPC interface:
1
using System;
2
3
namespace Queue_Send_a_Message
4
{
5
class Program
6
{
7
static void Main(string[] args)
8
{
9
var QueueName = "hello-world-queue";
10
var ClientID = "test-queue-client-id2";
11
var KubeMQServerAddress = "localhost:50000";
12
13
14
KubeMQ.SDK.csharp.Queue.Queue queue = null;
15
try
16
{
17
queue = new KubeMQ.SDK.csharp.Queue.Queue(QueueName, ClientID, KubeMQServerAddress);
18
}
19
catch (Exception ex)
20
{
21
Console.WriteLine(ex.Message);
22
}
23
24
try
25
{
26
var res = queue.SendQueueMessage(new KubeMQ.SDK.csharp.Queue.Message
27
{
28
Body = KubeMQ.SDK.csharp.Tools.Converter.ToByteArray("some-simple_queue-queue-message"),
29
Metadata = "emptyMeta"
30
});
31
if (res.IsError)
32
{
33
Console.WriteLine(quot;message enqueue error, error:{res.Error}");
34
}
35
else
36
{
37
Console.WriteLine(quot;message sent at, {res.SentAt}");
38
}
39
}
40
catch (Exception ex)
41
{
42
Console.WriteLine(ex.Message);
43
}
44
45
}
46
}
47
}
Copied!
When executed, a stream of events messages will be shown in the console.
The following Java code snippet is using KubeMQ's Java SDK with gRPC interface:
1
package io.kubemq.sdk.examples.get_Started.queue_Send_a_Message;
2
3
import io.kubemq.sdk.queue.Message;
4
import io.kubemq.sdk.queue.Queue;
5
import io.kubemq.sdk.queue.SendMessageResult;
6
import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
7
import io.kubemq.sdk.grpc.Kubemq;
8
import io.kubemq.sdk.tools.Converter;
9
10
import javax.net.ssl.SSLException;
11
import java.io.IOException;
12
13
public class Program {
14
15
public static void main(String[] args) throws ServerAddressNotSuppliedException {
16
17
18
String queueName = "hello-world-queue", clientID = "test-queue-client-id2", kubeMQServerAddress = "localhost:50000";
19
20
Queue queue = null;
21
try{
22
queue = new io.kubemq.sdk.queue.Queue(queueName,clientID,1,2,kubeMQServerAddress);
23
} catch (ServerAddressNotSuppliedException e) {
24
System.out.println("Error: Can not determine KubeMQ server address.");
25
} catch (io.grpc.StatusRuntimeException e) {
26
System.out.println("Error: KubeMQ is unreachable.");
27
} catch (SSLException e) {
28
System.out.println("Error: error detected by an SSL subsystem");
29
}
30
31
try {
32
33
Message msg = new Message()
34
.setBody(Converter.ToByteArray("some-simple_queue-queue-message"))
35
.setMetadata("empty");
36
SendMessageResult res= queue.SendQueueMessage(msg);
37
if(res.getIsError() ) {
38
System.out.println("message enqueue error, error:{res.Error}");
39
} else{
40
System.out.println("message sent at, {res.SentAt}");
41
}
42
} catch (IOException e) {
43
System.out.println("Error: I/O error occurred.");
44
}
45
}
46
}
Copied!
When executed, a stream of events messages will be shown in the console.
The following Go code snippet is using KubeMQ's Go SDK with gRPC interface:
1
package main
2
3
import (
4
"context"
5
"github.com/kubemq-io/kubemq-go"
6
"log"
7
"time"
8
)
9
10
func main() {
11
ctx, cancel := context.WithCancel(context.Background())
12
defer cancel()
13
client, err := kubemq.NewClient(ctx,
14
kubemq.WithAddress("localhost", 50000),
15
kubemq.WithClientId("test-command-client-id"),
16
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
17
if err != nil {
18
log.Fatal(err)
19
}
20
defer client.Close()
21
channel := "hello-world-queue"
22
23
sendResult, err := client.NewQueueMessage().
24
SetChannel(channel).
25
SetBody([]byte("some-simple_queue-queue-message")).
26
Send(ctx)
27
if err != nil {
28
log.Fatal(err)
29
}
30
log.Printf("Send to Queue Result: MessageID:%s,Sent At: %s\n", sendResult.MessageID, time.Unix(0, sendResult.SentAt).String())
31
}
Copied!
When executed, a stream of events messages will be shown in the console.
The following Python code snippet is using KubeMQ's Python SDK with gRPC interface:
1
from kubemq.queue.message_queue import MessageQueue
2
3
from kubemq.queue.message import Message
4
5
6
if __name__ == "__main__":
7
8
queue = MessageQueue("hello-world-queue", "test-queue-client-id2", "localhost:50000")
9
message = Message()
10
message.metadata = 'metadata'
11
message.body = "some-simple_queue-queue-message".encode('UTF-8')
12
message.attributes = None
13
try:
14
sent = queue.send_queue_message(message)
15
if sent.error:
16
print('message enqueue error, error:' + sent.error)
17
else:
18
print('message sent at: %d' % (
19
sent.sent_at
20
))
21
except Exception as err:
22
print('message enqueue error, error:%s' % (
23
err
24
))
Copied!
When executed, a stream of events messages will be shown in the console.
The following JS code snippet is using KubeMQ's NodeJS SDK with gRPC interface:
1
const kubemq = require('kubemq-nodejs');
2
3
let queueName = 'hello-world-queue', clientID = 'test-queue-client-id2',
4
kubeMQAddress = 'localhost:50000';
5
6
7
let queue = new kubemq.Queue(kubeMQAddress, queueName, clientID);
8
9
queue.sendQueueMessage(
10
new kubemq.Message('metadata', kubemq.stringToByte('some-simple_queue-queue-message')))
11
.then(sent => {
12
if (sent.Error) {
13
console.log('message enqueue error, error:' + err);
14
} else {
15
console.log('"message sent at:' + sent.SentAt);
16
}
17
}).catch(err => {
18
console.log('message enqueue error, error:' + err);
19
});
Copied!
The following PHP code snippet is using KubeMQ's REST interface:
1
<?php
2
3
$curl = curl_init();
4
5
curl_setopt_array($curl, array(
6
CURLOPT_URL => "http://localhost:9090/queue/send",
7
CURLOPT_RETURNTRANSFER => true,
8
CURLOPT_ENCODING => "",
9
CURLOPT_MAXREDIRS => 10,
10
CURLOPT_TIMEOUT => 0,
11
CURLOPT_FOLLOWLOCATION => false,
12
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
13
CURLOPT_CUSTOMREQUEST => "POST",
14
CURLOPT_POSTFIELDS =>"{\r\n \"Id\":\"\",\r\n \"ClientId\":\"send-message-client-id\",\r\n \"Channel\":\"hello-world-queue\",\r\n \"Metadata\":\"\",\r\n \"Body\":\"QmF0Y2ggTWVzc2FnZSAw\",\r\n \"Tags\":{\r\n \"message\":\"0\"\r\n },\r\n \"Attributes\":null,\r\n \"Policy\":{\r\n \"ExpirationSeconds\":5,\r\n \"DelaySeconds\":5,\r\n \"MaxReceiveCount\":0,\r\n \"MaxReceiveQueue\":\"\"\r\n }\r\n}",
15
CURLOPT_HTTPHEADER => array(
16
"Content-Type: application/json"),
17
));
18
19
$response = curl_exec($curl);
20
$err = curl_error($curl);
21
22
curl_close($curl);
23
24
if ($err) {
25
echo "cURL Error #:" . $err;
26
} else {
27
echo $response;
28
} ?>
Copied!
The following Ruby code snippet is using KubeMQ's REST interface:
1
require "uri"
2
require "net/http"
3
4
url = URI("http://localhost:9090/queue/send")
5
6
http = Net::HTTP.new(url.host, url.port)
7
8
request = Net::HTTP::Post.new(url)
9
request["Content-Type"] = "application/json"
10
request.body = "{\r\n \"Id\":\"\",\r\n \"ClientId\":\"send-message-client-id\",\r\n \"Channel\":\"hello-world-queue\",\r\n \"Metadata\":\"\",\r\n \"Body\":\"QmF0Y2ggTWVzc2FnZSAw\",\r\n \"Tags\":{\r\n \"message\":\"0\"\r\n },\r\n \"Attributes\":null,\r\n \"Policy\":{\r\n \"ExpirationSeconds\":5,\r\n \"DelaySeconds\":5,\r\n \"MaxReceiveCount\":0,\r\n \"MaxReceiveQueue\":\"\"\r\n }\r\n}"
11
12
response = http.request(request)
13
puts response.read_body
Copied!
The following jQuery code snippet is using KubeMQ's REST interface:
1
var settings = {
2
"url": "http://localhost:9090/queue/send",
3
"method": "POST",
4
"timeout": 0,
5
"headers": {
6
"Content-Type": "application/json"
7
},
8
"data": "{\r\n \"Id\":\"\",\r\n \"ClientId\":\"send-message-client-id\",\r\n \"Channel\":\"hello-world-queue\",\r\n \"Metadata\":\"\",\r\n \"Body\":\"QmF0Y2ggTWVzc2FnZSAw\",\r\n \"Tags\":{\r\n \"message\":\"0\"\r\n },\r\n \"Attributes\":null,\r\n \"Policy\":{\r\n \"ExpirationSeconds\":5,\r\n \"DelaySeconds\":5,\r\n \"MaxReceiveCount\":0,\r\n \"MaxReceiveQueue\":\"\"\r\n }\r\n}",
9
};
10
11
$.ajax(settings).done(function (response) {
12
console.log(response);
13
});
Copied!

Receive a Queue Message

After you have sent a message to a queue, you can request the message from a queue.
kubemqctl
curl
.Net
Java
Go
Python
Node
PHP
Ruby
jquery
Run the following kubemqctl command:
1
kubemqctl queues receive "hello-world-queue"
Copied!
The following cURL command is using KubeMQ's REST interface:
1
curl --location --request POST "http://localhost:9090/queue/receive" \
2
--header "Content-Type: application/json" \
3
--data '{"RequestID":"some-request-id","ClientID":"receive-message-client-id","Channel":"hello-world-queue","MaxNumberOfMessages":1,"WaitTimeSeconds":5}'
Copied!
The following c# code snippet is using KubeMQ's Java SDK with gRPC interface:
1
using System;
2
3
namespace Queue_Receive_a_Message
4
{
5
class Program
6
{
7
static void Main(string[] args)
8
{
9
var QueueName = "hello-world-queue";
10
var ClientID = "test-queue-client-id";
11
var KubeMQServerAddress = "localhost:50000";
12
13
14
KubeMQ.SDK.csharp.Queue.Queue queue = null;
15
try
16
{
17
queue = new KubeMQ.SDK.csharp.Queue.Queue(QueueName, ClientID, KubeMQServerAddress);
18
}
19
catch (Exception ex)
20
{
21
Console.WriteLine(ex.Message);
22
}
23
24
try
25
{
26
var msg = queue.ReceiveQueueMessages();
27
if (msg.IsError)
28
{
29
Console.WriteLine(quot;message dequeue error, error:{msg.Error}");
30
return;
31
}
32
Console.WriteLine(quot;Received {msg.MessagesReceived} Messages:");
33
34
foreach (var item in msg.Messages)
35
{
36
Console.WriteLine(quot;MessageID: {item.MessageID}, Body:{KubeMQ.SDK.csharp.Tools.Converter.FromByteArray(item.Body)}");
37
}
38
}
39
catch (Exception ex)
40
{
41
Console.WriteLine(ex.Message);
42
}
43
}
44
}
45
}
Copied!
The following Java code snippet is using KubeMQ's Java SDK with gRPC interface:
1
package io.kubemq.sdk.examples.get_Started.queue_Receive_a_Message;
2
3
import java.io.IOException;
4
5
import javax.net.ssl.SSLException;
6
7
import io.kubemq.sdk.queue.Message;
8
import io.kubemq.sdk.queue.Queue;
9
import io.kubemq.sdk.queue.ReceiveMessagesResponse;
10
import io.kubemq.sdk.basic.ServerAddressNotSuppliedException;
11
import io.kubemq.sdk.tools.Converter;
12
13
public class Program {
14
15
public static void main(String[] args) throws ServerAddressNotSuppliedException, ClassNotFoundException {
16
17
18
String queueName = "hello-world-queue", clientID = "test-queue-client-id2", kubeMQServerAddress = "localhost:50000";
19
20
21
Queue queue = null;
22
try{
23
queue = new io.kubemq.sdk.queue.Queue(queueName,clientID,1,2,kubeMQServerAddress);
24
} catch (ServerAddressNotSuppliedException e) {
25
System.out.println("Error: Can not determine KubeMQ server address.");
26
} catch (io.grpc.StatusRuntimeException e) {
27
System.out.println("Error: KubeMQ is unreachable.");
28
} catch (SSLException e) {
29
System.out.println("Error: error detected by an SSL subsystem");
30
}
31
32
try {
33
ReceiveMessagesResponse res= queue.ReceiveQueueMessages(2,null);
34
if(res.getIsError() ) {
35
System.out.println("message enqueue error, error:{res.Error}");
36
}
37
38
System.out.println("Received {msg.MessagesReceived} Messages:");
39
40
for (Message msg : res.getMessages()) {
41
42
System.out.printf("MessageID:%s, Body:%s",msg.getMessageID(), Converter.FromByteArray(msg.getBody()));
43
}
44
45
46
47
} catch (IOException e) {
48
System.out.println("Error: I/O error occurred.");
49
}
50
}
51
}
Copied!
The following Go code snippet is using KubeMQ's Go SDK with gRPC interface:
1
package main
2
3
import (
4
"context"
5
"github.com/kubemq-io/kubemq-go"
6
"log"
7
"time"
8
)
9
10
func main() {
11
ctx, cancel := context.WithCancel(context.Background())
12
defer cancel()
13
client, err := kubemq.NewClient(ctx,
14
kubemq.WithAddress("localhost", 50000),
15
kubemq.WithClientId("test-command-client-id"),
16
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
17
if err != nil {
18
log.Fatal(err)
19
}
20
defer client.Close()
21
channel := "hello-world-queue"
22
23
receiveResult, err := client.NewReceiveQueueMessagesRequest().
24
SetChannel(channel).
25
SetMaxNumberOfMessages(1).
26
SetWaitTimeSeconds(5).
27
Send(ctx)
28
if err != nil {
29
log.Fatal(err)
30
}
31
log.Printf("Received %d Messages:\n", receiveResult.MessagesReceived)
32
for _, msg := range receiveResult.Messages {
33
log.Printf("MessageID: %s, Body: %s", msg.Id, string(msg.Body))
34
}
35
}
Copied!
The following Python code snippet is using KubeMQ's Python SDK with gRPC interface:
1
from kubemq.queue.message_queue import MessageQueue
2
if __name__ == "__main__":
3
queue = MessageQueue("hello-world-queue", "test-queue-client-id2", "localhost:50000", 2, 1)
4
try:
5
res = queue.receive_queue_messages()
6
if res.error:
7
print(
8
"'Received:'%s'" % (
9
res.error
10
)
11
)
12
else:
13
for message in res.messages:
14
print(
15
"'MessageID :%s ,Body: sending:'%s'" % (
16
message.MessageID,
17
message.Body
18
)
19
)
20
except Exception as err:
21
print(
22
"'error sending:'%s'" % (
23
err
24
)
25
)
26
input("Press 'Enter' to stop the application...\n")
Copied!
The following JS code snippet is using KubeMQ's NodeJS SDK with gRPC interface:
1
const kubemq = require('kubemq-nodejs');
2
let queueName = 'hello-world-queue', clientID = 'test-queue-client-id2',
3
kubeMQAddress = 'localhost:50000';
4
let queue = new kubemq.Queue(kubeMQAddress, queueName, clientID);
5
queue.receiveQueueMessages(2, 1).then(res => {
6
if (res.Error) {
7
console.log('Message enqueue error, error:' + res.message);
8
} else {
9
if (res.MessagesReceived) {
10
console.log('Received: ' + res.MessagesReceived);
11
res.Messages.forEach(element => {
12
console.log('MessageID:' + element.MessageID + ', Body:' + kubemq.byteToString(element.Body));
13
});
14
} else {
15
console.log('No messages');
16
}
17
}
18
}).catch(
19
err => console.log('Error:' + err));
Copied!
The following PHP code snippet is using KubeMQ's REST interface:
1
<?php
2
3
$curl = curl_init();
4
5
curl_setopt_array($curl, array(
6
CURLOPT_URL => "http://localhost:9090/queue/receive",
7
CURLOPT_RETURNTRANSFER => true,
8
CURLOPT_ENCODING => "",
9
CURLOPT_MAXREDIRS => 10,
10
CURLOPT_TIMEOUT => 0,
11
CURLOPT_FOLLOWLOCATION => false,
12
CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
13
CURLOPT_CUSTOMREQUEST => "POST",
14
CURLOPT_POSTFIELDS =>"{\r\n \"RequestID\":\"some-request-id\",\r\n \"ClientID\":\"receive-message-client-id\",\r\n \"Channel\":\"hello-world-queue\",\r\n \"MaxNumberOfMessages\":10,\r\n \"WaitTimeSeconds\":5,\r\n \"IsPeak\":false\r\n}",
15
CURLOPT_HTTPHEADER => array(
16
"Content-Type: application/json"),
17
));
18
19
$response = curl_exec($curl);
20
$err = curl_error($curl);
21
22
curl_close($curl);
23
24
if ($err) {
25
echo "cURL Error #:" . $err;
26
} else {
27
echo $response;
28
} ?>
Copied!
The following Ruby code snippet is using KubeMQ's REST interface:
1
require "uri"
2
require "net/http"
3
4
url = URI("http://localhost:9090/queue/receive")
5
6
http = Net::HTTP.new(url.host, url.port)
7
8
request = Net::HTTP::Post.new(url)
9
request["Content-Type"] = "application/json"
10
request.body = "{\r\n \"RequestID\":\"some-request-id\",\r\n \"ClientID\":\"receive-message-client-id\",\r\n \"Channel\":\"hello-world-queue\",\r\n \"MaxNumberOfMessages\":10,\r\n \"WaitTimeSeconds\":5,\r\n \"IsPeak\":false\r\n}"
11
12
response = http.request(request)
13
puts response.read_body
Copied!
The following jQuery code snippet is using KubeMQ's REST interface:
1
var settings = {
2
"url": "http://localhost:9090/queue/receive",
3
"method": "POST",
4
"timeout": 0,
5
"headers": {
6
"Content-Type": "application/json"
7
},
8
"data": "{\r\n \"RequestID\":\"some-request-id\",\r\n \"ClientID\":\"receive-message-client-id\",\r\n \"Channel\":\"hello-world-queue\",\r\n \"MaxNumberOfMessages\":10,\r\n \"WaitTimeSeconds\":5,\r\n \"IsPeak\":false\r\n}",
9
};
10
11
$.ajax(settings).done(function (response) {
12
console.log(response);
13
});
Copied!
Get Queues information You can get Queues information by running kubemqctl queues list
Last modified 1yr ago