kser.crypto
— Message encryption¶
This module allow you to encrypt and decrypt messages in kafka
Install¶
pip install kser[crypto]
API focus¶
-
kser.crypto.CryptoMessage(context):
It’s a container which includes the original message as well as the nonce required by the consumer to decipher the content
param dict context: We use marshmallow context to store the secretbox_key -
kser.crypto.CryptoMessage.
decode
(jdata)¶ Encode message using libsodium
Parameters: kmsg (kser.schemas.Message) – Kafka message Returns: the Encoded message
-
kser.crypto.CryptoMessage.
encode
(kmsg)¶ Decode message using libsodium
Parameters: jdata (str) – jdata to load Returns: the Encoded message
-
Example¶
For this example, we’ll use kafka-python as kafka backend.
Consumer example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | from kser.controller import Controller
from kser.crypto import CryptoMessage
from kser.python_kafka.consumer import Consumer
class CryptoController(Controller):
TRANSPORT = CryptoMessage
class CryptoConsumer(Consumer):
REGISTRY = CryptoController
if __name__ == '__main__':
consumer = CryptoConsumer(config=dict(...), topics=list(...))
consumer.run()
|
Producer example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | import os
from uuid import uuid4
from cdumay_result import Result
from kser.crypto import CryptoSchema
from kser.python_kafka.producer import Producer
from kser.schemas import Message
class CryptoProducer(Producer):
# noinspection PyUnusedLocal
def send(self, topic, kmsg, timeout=60):
result = Result(uuid=kmsg.uuid)
try:
self.client.send(topic, CryptoSchema(context=dict(
secretbox_key=os.getenv("KSER_SECRETBOX_KEY", None)
)).encode(self._onmessage(kmsg)).encode("UTF-8"))
result.stdout = "Message {}[{}] sent in {}".format(
kmsg.entrypoint, kmsg.uuid, topic
)
self.client.flush()
except Exception as exc:
result = Result.from_exception(exc, kmsg.uuid)
finally:
if result.retcode < 300:
return self._onsuccess(kmsg=kmsg, result=result)
else:
return self._onerror(kmsg=kmsg, result=result)
if __name__ == '__main__':
producer = CryptoProducer(config=dict(...))
producer.send("my.topic", Message(uuid=str(uuid4()), entrypoint="myTest"))
|