Node

The KubeMQ SDK for NodeJS enables Node JS/Typescript developers to seamlessly communicate with the KubeMQ server, implementing various communication patterns such as Events, EventStore, Commands, Queries, and Queues.

Sources

SDK

Examples

Prerequisites

  • Node.js (Ensure you have a recent version of Node.js installed)

  • TypeScript Compiler

  • KubeMQ server running locally or accessible over the network

Installation

The recommended way to use the SDK for Node in your project is to consume it from Node package manager.


npm install kubemq-js

Payload Details

  • Metadata: The metadata allows us to pass additional information with the event. Can be in any form that can be presented as a string, i.e., struct, JSON, XML and many more.

  • Body: The actual content of the event. Can be in any form that is serializable into a byte array, i.e., string, struct, JSON, XML, Collection, binary file and many more.

  • ClientID: Displayed in logs, tracing, and KubeMQ dashboard(When using Events Store, it must be unique).

  • Tags: Set of Key-value pair that help categorize the message

KubeMQ PubSub Client

For executing PubSub operation we have to create the instance of PubsubClient, its instance can be created with minimum two parameter address (KubeMQ server address) & clientId . With these two parameter plainText connections are established. The table below describes the Parameters available for establishing a connection.

PubSub Client Configuration

Pubsub Client connection establishment example code


const  opts: Config = {
	address:  'localhost:50000',
	clientId:  Utils.uuid(),
	reconnectIntervalSeconds:  1,
};

const  pubsubClient = new  PubsubClient(opts);

The example below demonstrates to construct PubSubClient with ssl and other configurations:


const  config: Config = {

	address:  'localhost:50000', // KubeMQ gRPC endpoint address
	clientId:  'your-client-id', // Connection clientId
	authToken:  'your-jwt-auth-token', // Optional JWT authorization token
	tls:  true, // Indicates if TLS is enabled
	tlsCertFile:  'path/to/tls-cert.pem', // Path to the TLS certificate file
	tlsKeyFile:  'path/to/tls-key.pem', // Path to the TLS key file
	tlsCaCertFile:  'path/to/tls-key.pem', // Path to the TLS key file
	maxReceiveSize:  1024 * 1024 * 100, // Maximum size of the messages to receive (100MB)
	reconnectIntervalSeconds:  1 // Interval in milliseconds between reconnect attempts (1 second)
};

Ping To KubeMQ server

You can ping the server to check connection is established or not.

Request: NONE

Response: ServerInfo Interface Attributes


ServerInfo  pingResult = pubsubClient.ping();
console.log('Ping Response: ' + pingResult);

Create Channel

PubSub CreateEventsChannel Example:

Request:

Response:


async  function  createEventsChannel(channel: string) {
return  pubsubClient.createEventsChannel(channel);
}

PubSub Create Events Store Channel Example:

Request:

Response:



async  function  createEventsStoreChannel(channel: string) {
return  pubsubClient.createEventsStoreChannel(channel);
}

Delete Channel

PubSub DeleteEventsChannel Example:

Request:

Response:


async  function  deleteEventsChannel(channel: string) {
return  pubsubClient.deleteEventsChannel(channel);
}

PubSub Delete Events Store Channel Example:

Request:

Response:


async  function  deleteEventsStoreChannel(channel: string) {
return  pubsubClient.deleteEventsStoreChannel(channel);
}

List Channels

PubSub ListEventsChannel Example:

Request:

Response: PubSubChannel[] PubSubChannel interface Attributes


async  function  listEventsChannel(search: string) {
	const  channels = await  pubsubClient.listEventsChannels(search);
	console.log(channels);
}

PubSub ListEventsStoreChannel Example:

Request:

Response: PubSubChannel[] PubSubChannel interface Attributes


async  function  listEventsStoreChannel(search: string) {
	const  channels = await  pubsubClient.listEventsStoreChannels(search);
	console.log(channels);
}

PubSub Send & Receive

PubSub SendEventMessage Example:

Request: EventMessage Interface Attributes

Note:- metadata or body or tags any one is required

Response: NONE


await  pubsubClient.sendEventsMessage({
	id:  `234`,
	channel: 'events.single',
	body:  Utils.stringToBytes('event message'),
});

PubSub SendEventStoreMessage Example:

Request: EventStoreMessage Class Attributes

Note:- metadata or body or tags any one is required

Response: NONE


await  pubsubClient.sendEventStoreMessage({
	id:  '987',
	channel: 'events_store.single',
	body:  Utils.stringToBytes('event store message'),
});

PubSub SubscribeEvents Example:

Request: EventsSubscription Class Attributes

Response: NONE

Callback: EventMessageReceived class details

async function subscribeToEvent() {  
  //Subscribes to events from the specified channel and processes received events.  
  const eventsSubscriptionRequest = new EventsSubscriptionRequest('events.A', '');  
  
  // Define the callback for receiving events  
  eventsSubscriptionRequest.onReceiveEventCallback = (event: EventMessageReceived) => {  
    console.log('SubscriberA received event:', {  
      id: event.id,  
      fromClientId: event.fromClientId,  
      timestamp: event.timestamp,  
      channel: event.channel,  
      metadata: event.metadata,  
      body: event.body,  
      tags: event.tags,  
    });  
  };  
  
  // Define the callback for handling errors  
  eventsSubscriptionRequest.onErrorCallback = (error: string) => {  
    console.error('SubscriberA error:', error);  
  };  
  
  pubsubClient  
  .subscribeToEvents(eventsSubscriptionRequest)  
    .then(() => {  
      console.log('Subscription successful');  
    })  
    .catch((reason: any) => {  
      console.error('Subscription failed:', reason);  
    });  
  
}

PubSub SubscribeEventsStore Example:

Request: EventsStoreSubscription Interface Attributes

Response: None

Callback: EventStoreMessageReceived class details

async function subscribeToEventStore() {  
  //Subscribes to events store messages from the specified channel with a specific configuration.  
  const eventsSubscriptionRequest = new EventsStoreSubscriptionRequest('events_store.A', '');  
  eventsSubscriptionRequest.eventsStoreType = EventStoreType.StartAtSequence;  
  eventsSubscriptionRequest.eventsStoreSequenceValue=1;  
  
  // Define the callback for receiving events  
  eventsSubscriptionRequest.onReceiveEventCallback = (event: EventStoreMessageReceived) => {  
    console.log('SubscriberA received event:', {  
      id: event.id,  
      fromClientId: event.fromClientId,  
      timestamp: event.timestamp,  
      channel: event.channel,  
      metadata: event.metadata,  
      body: event.body,  
      tags: event.tags,  
      sequence: event.sequence,  
    });  
  };  
  
  // Define the callback for handling errors  
  eventsSubscriptionRequest.onErrorCallback = (error: string) => {  
    console.error('SubscriberA error:', error);  
  };  
  
  pubsubClient  
  .subscribeToEvents(eventsSubscriptionRequest)  
    .then(() => {  
      console.log('Events Subscription successful');  
    })  
    .catch((reason: any) => {  
      console.error('Event Subscription failed:', reason);  
    });  
}

KubeMQ Queues Operations

The examples below demonstrate the usage of KubeMQ Queues client. The examples include creating, deleting, listing channels, and sending/receiving queues messages.

Construct the Queues Client

For executing Queues operation we have to create the instance of QueuesClient, its instance can be created with minimum two parameter address (KubeMQ server address) & clientId . With these two parameter plainText connections are established. The table below describes the Parameters available for establishing a connection.

QueuesClient Configuration

Queues Client establishing a connection example code


const  opts: Config = {
	address:  'localhost:50000',
	clientId:  Utils.uuid(),
};

const  queuesClient = new  QueuesClient(opts);

The example below demonstrates to construct PubSubClient with ssl and other configurations:

const  opts: Config = {
	address:  'localhost:50000', // KubeMQ gRPC endpoint address
	clientId:  'your-client-id', // Connection clientId
	authToken:  'your-jwt-auth-token', // Optional JWT authorization token
	tls:  true, // Indicates if TLS is enabled
	tlsCertFile:  'path/to/tls-cert.pem', // Path to the TLS certificate file
	tlsKeyFile:  'path/to/tls-key.pem', // Path to the TLS key file
	tlsCaCertFile:  'path/to/tls-ca-cert.pem', // Path to the TLS CA cert file
	maxReceiveSize:  1024 * 1024 * 100, // The Maximum size of the messages to receive (100MB)
	reconnectIntervalSeconds:  1 // Interval in milliseconds between reconnect attempts (1 second)
};

const  queuesClient = new  QueuesClient(opts);

Ping To KubeMQ server

You can ping the server to check connection is established or not.

Request: NONE

Response: ServerInfo Class Attributes


const  pingResult = queuesClient.ping();
console.log('Ping Response: ' + pingResult);

Create Channel

Queues CreateQueueChannel Example:

Request:

Response:


async  function  createQueueChannel(channel: string) {
	return  queuesClient.createQueuesChannel(channel);
}

Delete Channel

Queues DeleteQueueChannel Example:

Request:

Response:


async  function  createQueueChannel(channel: string) {
	return  queuesClient.deleteQueuesChannel(channel);
}

List Channels

Queues listQueueChannels Example:

Request:

Response: QueuesChannel[] QueuesChannel interface Attributes


async  function  listQueueChannels(search: string) {
	const  channels = await  queuesClient.listQueuesChannel(search);
	console.log(channels);
}

Send & Receive Queue Messages

Queues SendSingleMessage Example:

Request: QueueMessage class attributes

Response: QueueSendResult class attributes


await  queuesClient.sendQueuesMessage({
	channel:  'queues.single',
	body:  Utils.stringToBytes('queue message'),
})
.then((result) =>  console.log(result))
.catch((reason) =>  console.error(reason));

Queues Pulls messages from a queue. Example:

Request: QueuesPullWaitingMessagesRequest class attributes

Response: QueuesPullWaitingMessagesResponse class attributes


await  queuesClient
.pull({
	channel:  'queues.peek',
	maxNumberOfMessages:  10,
	waitTimeoutSeconds:  10,
})

.then((response) => {
	response.messages.forEach((msg) => {
	console.log(msg);
});
})

.catch((reason) => {
	console.error(reason);
});

Queues Get waiting messages from a queue Example:

Request: QueuesPullWaitngMessagesRequest class attributes

Response: QueuesPullWaitingMessagesResponse class attributes


await  queuesClient
.waiting({
	channel:  'queues.peek',
	maxNumberOfMessages:  5,
	waitTimeoutSeconds:  20,
})

.then((response) => {
	response.messages.forEach((msg) => {
	console.log(msg);
});
})
.catch((reason) => {
	console.error(reason);
});

Poll Queue Messages

Receives messages from a Queue channel.

Request: QueuesPollRequest Class Attributes

Response: QueuesMessagesPulledResponse Class Attributes

Response: QueueMessageReceived class attributes

Here's the requested Markdown table for the QueueMessageReceived class:

Example

async function main() {  
  const opts: Config = {  
    address: 'localhost:50000',  
    clientId: 'kubeMQClientId-ts',  
  };  
  const queuesClient = new QueuesClient(opts);  
  
  // Receive with message visibility  
  async function receiveWithVisibility(visibilitySeconds: number) {  
    console.log("\n============================== Receive with Visibility =============================\n");  
    try {  
      const pollRequest = new QueuesPollRequest({  
        channel: 'visibility_channel',  
        pollMaxMessages: 1,  
        pollWaitTimeoutInSeconds: 10,  
        visibilitySeconds: visibilitySeconds,  
        autoAckMessages: false,  
      });  
  
      const pollResponse = await queuesClient.receiveQueuesMessages(pollRequest);  
      console.log("Received Message Response:", pollResponse);  
        
      if (pollResponse.isError) {  
        console.log("Error: " + pollResponse.error);  
      } else {  
        pollResponse.messages.forEach(async (msg) => {  
          console.log(`Message ID: ${msg.id}, Message Body: ${Utils.bytesToString(msg.body)}`);  
          try {  
            await new Promise(resolve => setTimeout(resolve, 1000));  
            await msg.ack();  
            console.log("Acknowledged message");  
          } catch (err) {  
            console.error("Error acknowledging message:", err);  
          }  
        });  
      }  
    } catch (error) {  
      console.error('Failed to receive queue messages:', error);  
    }  
  }  
  
  // Test visibility expiration  
  async function receiveWithVisibilityExpired() {  
    console.log("\n============================== Receive with Visibility Expired =============================\n");  
    await receiveWithVisibility(2);  
  }  
  
  // Test visibility extension  
  async function receiveWithVisibilityExtension() {  
    console.log("\n============================== Receive with Visibility Extension =============================\n");  
    try {  
      const pollRequest = new QueuesPollRequest({  
        channel: 'visibility_channel',  
        pollMaxMessages: 1,  
        pollWaitTimeoutInSeconds: 10,  
        visibilitySeconds: 3,  
        autoAckMessages: false,  
      });  
  
      const pollResponse = await queuesClient.receiveQueuesMessages(pollRequest);  
      console.log("Received Message Response:", pollResponse);  
  
      if (pollResponse.isError) {  
        console.log("Error: " + pollResponse.error);  
      } else {  
        pollResponse.messages.forEach(async (msg) => {  
          console.log(`Message ID: ${msg.id}, Message Body: ${Utils.bytesToString(msg.body)}`);  
          try {  
            await new Promise(resolve => setTimeout(resolve, 1000));  
            await msg.extendVisibilityTimer(3);  
            await new Promise(resolve => setTimeout(resolve, 2000));  
            await msg.ack();  
            console.log("Acknowledged message after extending visibility");  
          } catch (err) {  
            console.error("Error during visibility extension:", err);  
          }  
        });  
      }  
    } catch (error) {  
      console.error('Failed to receive queue messages:', error);  
    }  
  }  
  
  await receiveWithVisibilityExpired();  
  await receiveWithVisibilityExtension();  
}  
  
main();

This method allows you to receive messages from a specified Queue channel. You can configure the polling behavior, including the maximum number of messages to receive and the wait timeout. The response provides detailed information about the received messages and the transaction.

Message Handling Options:

  1. Acknowledge (ack): Mark the message as processed and remove it from the queue.

  2. Reject: Reject the message. It won't be requeued.

  3. Requeue: Send the message back to the queue for later processing.

Choose the appropriate handling option based on your application's logic and requirements.

KubeMQ Command & Query Operations

Construct the CQClient

For executing command & query operation we have to create the instance of CQClient, its instance can be created with minimum two parameter address (KubeMQ server address) & clientId . With these two parameter plainText connections are established. The table below describes the Parameters available for establishing a connection.

CQClient Configuration

CQClient establishing a connection example code


const  opts: Config = {

    address:  'localhost:50000',
    clientId:  Utils.uuid(),
    reconnectIntervalSeconds:  1,
};

const  cqClient = new  CQClient(opts);

The example below demonstrates to construct CQClient with ssl and other configurations:


const  config: Config = {

    address:  'localhost:50000', // KubeMQ gRPC endpoint address
    clientId:  'your-client-id', // Connection clientId
    authToken:  'your-jwt-auth-token', // Optional JWT authorization token
    tls:  true, // Indicates if TLS is enabled
    tlsCertFile:  'path/to/tls-cert.pem', // Path to the TLS certificate file
    tlsKeyFile:  'path/to/tls-key.pem', // Path to the TLS key file
    tlsCaCertFile:  'path/to/tls-ca-cert.pem', // Path to the TLS CA cert file
    maxReceiveSize:  1024 * 1024 * 100, // Maximum size of the messages to receive (100MB)
    reconnectIntervalSeconds:  1, // Interval in milliseconds between reconnect attempts (1 second)
};
const  cqClient = new  CQClient(opts);

Ping To KubeMQ server

You can ping the server to check connection is established or not.

Request: NONE

Response: ServerInfo interface Attributes


const  pingResult = cqClient.ping();
console.log('Ping Response: ' + pingResult);

Create Channel

Command CreateCommandsChannel Example:

Request:

Response:


async  function  createCommandsChannel(channel: string) {
    return  cqClient.createCommandsChannel(channel);
}

Queries CreateQueriesChannel Example:

Request:

Response:

async function createQueriesChannel(channel: string) { return cqClient.createQueriesChannel(channel); }


## Delete Channel

**Command DeleteCommandsChannel Example:**

#### Request:

| Name        | Type   | Description                           | Default Value | Mandatory |
|-------------|--------|---------------------------------------|---------------|-----------|
| channelName | String | Channel name which you want to delete | None          | Yes       |


#### Response:

| Name | Type          | Description                           |
|------|---------------|---------------------------------------|
| void | Promise<void> | Doesn't return a value upon completion |

```typescript

async  function  deleteCommandsChannel(channel: string) {
    return  cqClient.deleteCommandsChannel(channel);
}

Queries DeleteQueriesChannel Example:

Request:

Response:


async  function  deleteQueriesChannel(channel: string) {
    return  cqClient.deleteQueriesChannel(channel);
}

List Channels

Command ListCommandsChannel Example:

Request:

Response: CQChannel[] CQChannel interface attributes


async  function  listCommandsChannels(search: string) {
    const  channels = await  cqClient.listCommandsChannels(search);
    console.log(channels);
}

Queries ListQueriesChannel Example:

Request:

Response: List<CQChannel> CQChannel class attributes


async  function  listQueriesChannels(search: string) {
    const  channels = await  cqClient.listQueriesChannels(search);
    console.log(channels);
}

Send & Receive Command & Query Messages

Command SubscribeToCommandsChannel Example:

Request: CommandsSubscription Class Attributes

Response: None

Callback: CommandsReceiveMessage interface attributes

async function subscribeToCommands(channelName: string) {
    //Subscribes to commands from the specified channel with a specific configuration.  
    const commandSubscriptionRequest = new CommandsSubscriptionRequest(channelName, 'group1');

    // Define the callback for receiving commandMessage  
    commandSubscriptionRequest.onReceiveEventCallback = (commandMessage: CommandMessageReceived) => {
        console.log('SubscriberA received commandMessage:', {
            id: commandMessage.id,
            fromClientId: commandMessage.fromClientId,
            timestamp: commandMessage.timestamp,
            channel: commandMessage.channel,
            metadata: commandMessage.metadata,
            body: commandMessage.body,
            tags: commandMessage.tags,
        });
    };

    // Define the callback for handling errors  
    commandSubscriptionRequest.onErrorCallback = (error: string) => {
        console.error('SubscriberA error:', error);
    };

    cqClient.subscribeToCommands(commandSubscriptionRequest)
        .then(() => {
            console.log('Command Subscription successful');
        })
        .catch((reason: any) => {
            console.error('Command Subscription failed:', reason);
        });
}

Queries SubscribeToQueriesChannel Example:

Request: QueriesSubscriptionRequest Class Attributes

Response: None

Callback: QueriesReceiveMessage interface attributes

async function subscribeToQueries(channelName: string) {

    //Subscribes to queries from the specified channel with a specific configuration.  
    const commandSubscriptionRequest = new CommandsSubscriptionRequest(channelName, 'group1');

    // Define the callback for receiving queriesMessage  
    commandSubscriptionRequest.onReceiveEventCallback = (commandMessage: CommandMessageReceived) => {
        console.log('SubscriberA received event:', {
            id: commandMessage.id,
            fromClientId: commandMessage.fromClientId,
            timestamp: commandMessage.timestamp,
            channel: commandMessage.channel,
            metadata: commandMessage.metadata,
            body: commandMessage.body,
            tags: commandMessage.tags,
        });
    };

    // Define the callback for handling errors  
    commandSubscriptionRequest.onErrorCallback = (error: string) => {
        console.error('SubscriberA error:', error);
    };

    cqClient.subscribeToQueries(commandSubscriptionRequest)
        .then(() => {
            console.log('Queries Subscription successful');
        })
        .catch((reason: any) => {
            console.error('Queries Subscription failed:', reason);
        });
}

Last updated