Building and Deploying a KPS Connector

Nutanix.dev - Building and Deploying a KPS Connector

This blog post will guide you through building and deploying a KPS connector. If you’re unfamiliar with connectors, see our announcement about KPS Connectors.  To recap, A KPS Connector is a Kubernetes application running at a project scope that fulfills the connector service GRPC contract. We can write connectors in any programming language. This blog post will build a bidirectional NATS connector using the Golang connector template to create a new connector without writing boilerplate code.

Before we start writing code, let’s familiarize ourselves with some terminology: 

  • Connector Class: The connector class contains the container image of the connector, the Kubernetes resource specification, the schema of connector configuration, and the schema of connector streams. An infrastructure admin can register a connector class for a tenant, enabling project users to create connector instances.
  • Connector Instance: After infra admins register a connector class with KPS, project users can configure the parameters and create an instance of the connector class on their project. Creating a connector instance deploys the Kubernetes resource described in the connector class to the project namespace. Project users can use one connector class to create multiple connector instances across different projects.
  • Connector Stream: Connector stream is a unit of data access configuration. Streams can map onto any logically partitioned set of records. A stream can either be an Ingress stream or an Egress stream denoting the directionality of the data flowing through it. A connector instance can have multiple connector streams.
  • Connector Configuration: Project users can configure a connector at runtime, i.e., config changes don’t require restarting a connector.

Building a KPS Connector

We will start by cloning the template repository.

git clone git@github.com:nutanix/kps-connector-go-template.git

We need to edit the “template.go” file and fill in the TODOs for the consumer and producer objects. For a bidirectional NATS connector, the connector stream will have to support data infusion from a NATS subject on a NATS broker. A NATS subject maps naturally onto a connector stream. As such, the connector stream needs to contain all the information required to communicate to a NATS subject. I.e., NATS broker URL and NATS subject. We can fill out the `streamMetadata` struct with the NATS Subject and Broker URL.

type streamMetadata struct {
  NatsSubject string
  NatsBroker  string
}

We can then transform the properties in the metadata map in `mapToStreamMetadata` function and translate them into the correct `streamMetdata` types.

// mapToStreamMetadata translates the stream metadata into the corresponding streamMetadata struct
func mapToStreamMetadata(metadata map[string]interface{}) *streamMetadata {
  subj := metadata["subject"].(string)
  broker := metadata["broker"].(string)
  return &streamMetadata{
     NatsBroker:  broker,
     NatsSubject: subj,
  }
}

We can now focus on writing the code for the consumer. The consumer needs to provide an iterator for fetching NATS messages for a given subject. We can implement this by creating a channel of NATS messages.

type consumer struct {
  metadata  *streamMetadata
  natsSub   *nats.Subscription
  msgCh     chan *nats.Msg
}

func newConsumer() *consumer {
  return &consumer{
     msgCh: make(chan *nats.Msg),
  }
}

We need to add the logic for subscribing to a NATS subject and publishing the messages from the NATS subject on to `msgCh`.

func (c *consumer) topicToMsgCh(msg *nats.Msg) {
  c.msgCh <- msg
}

func (c *consumer) subscribe(ctx context.Context, metadata *streamMetadata) error {
  conn, err := nats.Connect(metadata.NatsBroker)
  if err != nil {
     return err
  }

  sub, err := conn.Subscribe(metadata.NatsSubject, c.topicToMsgCh)
  if err != nil {
     return err
  }

  c.natsSub = sub
  return nil
}

We need to create an iterator that returns the message from the consumer channel on each invocation.

func (c *consumer) nextMsg() ([]byte, error) {
  msg := <-c.msgCh
  return msg.Data, nil
}

With the consumer now implemented, we need to start implementing the producer. The producer needs to publish a NATS message to a subject each time.

type producer struct {
  natsConn  *nats.Conn
  subject   string
}

func newProducer() *producer {
  return &producer{}
}

The producer needs to implement the `connect` method to connect to the NATS broker. 

func (p *producer) connect(ctx context.Context, metadata *streamMetadata) error {
  conn, err := nats.Connect(metadata.NatsBroker)
  if err != nil {
     return err
  }
  p.natsConn = conn
  p.subject = metadata.NatsSubject
  return nil
}

Each time there is a new message on the stream, the `subscribeMsgHandler` callback function gets called. This callback function needs to publish the messages received to the NATS subject.

func (p *producer) subscribeMsgHandler(message *transport.Message) {
  if err := p.natsConn.Publish(p.subject, message.Payload); err != nil {
     log.Printf("unable to publish message to NATS: %s", err)
  }
}

After updating the template.go, we need to fill out a few more TODOs.  You can build the image by running `make` and publish it to the container registry by running `make publish`.

Deploying a KPS Connector

Once the connector image has been built and the image published to the container registry, we can create the connector resource specification.

# kubernetes_resource.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
 name: natsconnector
spec:
 replicas: 1
 selector:
   matchLabels:
     app: natsconnector
 template:
   metadata:
     name: natsconnector
     labels:
       app: natsconnector
   spec:
     containers:
       - name: natsconnector
         image: "registry.url/img/nats:{{ .Parameters.image_tag }}"
         imagePullPolicy: Always
         ports:
           - containerPort: 8000
---
kind: Service
apiVersion: v1
metadata:
 name: natsconnector-svc
spec:
 selector:
   app: natsconnector
 ports:
   - protocol: TCP
     name: natsconnector
     port: 9000
     targetPort: 8000

You can use the following jq command to string encode the YAML file:

jq -Rs . < samples/connector.yaml

We can now create the connector class and embed the Kubernetes specification into the class definition. Other important properties of note are: 

  • staticParameterSchema: JSON schema of the parameters needed to render the template. In this case, just the image tag.
  • configParameterSchema: JSON schema of the dynamic configuration. In this case, just the `log_level` property.

streamParameterSchema: JSON schema of the stream. In this case, the NATS subject and broker.

# connector_class.json
{
 "name": "natsconnector",
 "description": "This is a class definition of NATS connector.",
 "connectorVersion": "1.0",
 "minSvcDomainVersion": "2.3.0",
 "type": "BIDIRECTIONAL",
 "staticParameterSchema": {
   "type": "object",
   "properties": {
     "image_tag": {
       "type": "string",
       "description": "test docker image tag to render in yaml"
     }
   }
 },
 "configParameterSchema": {
   "type": "object",
   "properties": {
     "log_level": {
       "type": "string",
       "description": "driver docker container log level"
     }
   }
 },
 "streamParameterSchema": {
   "type": "object",
   "description": "Stream schema",
   "properties": {
     "subject": {
       "type": "string",
       "description": "subject to fetch the messages from / emit the messages to"
     },
     "broker": {
       "type": "string",
       "description": "address of the NATS broker to read from / write to"
     }
   }
 },
 "yamlData": "..."
}

Once we create the connector class, we can register it using KPS CLI. An infrastructure admin can register a connector class with a KPS control plane.

kps create connectorclass -f connector_class.json

After the infra admin registers the connector class, project users can create an instance of the connector class. Creating a connector instance deploys the Kubernetes resource in the project namespace on the service domain. We can use the kps CLI to find the connectorClassID and projectId by running `kps get connectorclass` and `kps get project` commands.

# connector_insstance.json
{
 "name": "natsconnectorinstance",
 "connectorClassID": "769aa9ed-191b-4aee-a21e-fd63a35d8c52",
 "projectId": "5a3ed810-0f0c-48fb-91a2-8c91b3726a09",
 "staticParameters": {
   "image_tag": "testing"
 }
}
kps create connectorinstance -f connector_instance.json

Once we have created a connector instance, we can create streams for that instance. Given the connector is a Bidirectional connector, we will create two streams: one ingress stream and one egress stream.

# connector_stream_ingress.json
{
 "name": "nats-stream-ingress",
 "connectorInstanceID": "2ec0d49b-3c49-4d82-9184-72a9150c4945",
 "labels": [
   {
     "id": "c5aab644-8bbe-470e-bc48-d034a657a0dc",
     "value": "nats"
   }
 ],
 "direction": "INGRESS",
 "serviceDomainIds": [],
 "stream": {
   "broker": "nats://nats:4222",
   "subject": "nats-stream-ingress"
 }
}

The ingress stream has labels referencing the category UUID and category value. When creating a data pipeline, these labels are used to select the matching streams.

kps create connectorstream -i natsconnectorinstance -f connector_stream_ingress.json

Similarly, we can create an egress stream. The egress stream does not have labels as we only support one stream as an output for a data pipeline. In the future, we might remove this limitation.

# connector_stream_egress.json
{
 "name": "nats-stream-egress",
 "connectorInstanceID": "2ec0d49b-3c49-4d82-9184-72a9150c4945",
 "labels": [],
 "direction": "EGRESS",
 "serviceDomainIds": [],
 "stream": {
   "broker": "nats://nats:4222",
   "subject": "nats-stream-ingress"
 }
}

After creating connector streams, we can create a data pipeline to consume data and produce data to NATS via the NATS connector instance. Here is an example of a data pipeline that will consume messages from the ingress stream, transform the messages through  a function named `exampleFunc`  and publish the messages to the egress stream.

# pipeline.yaml
kind: dataPipeline
name: example-pipeline
description: This data pipeline consumes and produces to NATS streams
project: Blog
functions:
 - name: exampleFunc
input:
 categorySelectors:
   connector:
     - nats
output:
 localEdge:
   type: Connector
   service: natsconnectorinstance
   endpointName: nats-stream-egress

Note how the ingress stream is never mentioned explicitly in the pipeline definition. This is because we select the streams via category matching. If we create multiple streams with the same category value, KPS will send messages from all those streams to exampleFunc.

Similar to how we created a NATS Connector for KPS, you can build a connector to any data source or data sink that you want to bridge. Once you have built a connector, let us know and we will add it to our library of connectors.