kser.crypto — Message encryption

This module allow you to encrypt and decrypt messages in kafka

Install

pip install kser[crypto]

API focus

kser.crypto.CryptoMessage(context):

It’s a container which includes the original message as well as the nonce required by the consumer to decipher the content

param dict context:
 We use marshmallow context to store the secretbox_key
kser.crypto.CryptoMessage.decode(jdata)

Encode message using libsodium

Parameters:kmsg (kser.schemas.Message) – Kafka message
Returns:the Encoded message
kser.crypto.CryptoMessage.encode(kmsg)

Decode message using libsodium

Parameters:jdata (str) – jdata to load
Returns:the Encoded message

Example

For this example, we’ll use kafka-python as kafka backend.

Consumer example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
 from kser.controller import Controller
 from kser.crypto import CryptoMessage
 from kser.python_kafka.consumer import Consumer


 class CryptoController(Controller):
     TRANSPORT = CryptoMessage


 class CryptoConsumer(Consumer):
     REGISTRY = CryptoController


 if __name__ == '__main__':
     consumer = CryptoConsumer(config=dict(...), topics=list(...))
     consumer.run()

Producer example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
 import os
 from uuid import uuid4

 from cdumay_result import Result
 from kser.crypto import CryptoSchema
 from kser.python_kafka.producer import Producer
 from kser.schemas import Message


 class CryptoProducer(Producer):
     # noinspection PyUnusedLocal
     def send(self, topic, kmsg, timeout=60):
         result = Result(uuid=kmsg.uuid)
         try:
             self.client.send(topic, CryptoSchema(context=dict(
                 secretbox_key=os.getenv("KSER_SECRETBOX_KEY", None)
             )).encode(self._onmessage(kmsg)).encode("UTF-8"))

             result.stdout = "Message {}[{}] sent in {}".format(
                 kmsg.entrypoint, kmsg.uuid, topic
             )
             self.client.flush()

         except Exception as exc:
             result = Result.from_exception(exc, kmsg.uuid)

         finally:
             if result.retcode < 300:
                 return self._onsuccess(kmsg=kmsg, result=result)
             else:
                 return self._onerror(kmsg=kmsg, result=result)


 if __name__ == '__main__':
     producer = CryptoProducer(config=dict(...))
     producer.send("my.topic", Message(uuid=str(uuid4()), entrypoint="myTest"))