Operation and task

Kser provide a way to create and launch operations. An operation is a list of task executed linearly.

Note

Each operation’s task may be executed on different consumer.

API focus

class kser.sequencing.task.Task(uuid=None, params=None, status="PENDING", result=None, metadata=None)

A task is a kser.entry.Entrypoint with additional attributes. To do this, use a database as shared backend.

Parameters:
  • uuid (str) – task unique identifier
  • params (dict) – task parameter
  • status (str) – task status
  • result (cdumay_result.Result) – forwarded result from a previous task
  • metadata (dict) – task context
log(message, level=logging.INFO, *args, **kwargs)

Send log entry, prefixing message using the following format:

{TaskName}.{TaskStatus}: {EntryPointPath}[{TaskUUID}]:
Parameters:
  • message (str) – log message
  • level (int) – Logging level
  • args (list) – log record arguments
  • kwargs (dict) – log record key arguments
class kser.sequencing.operation.Operation(uuid=None, params=None, status="PENDING", result=None, metadata=None)

In fact it’s a kser.sequencing.task.Task which has other task as child.

classmethod new(**kwargs)

Warning

Deprecated, do not use this method anymore.

Initialize the operation using a dict.

Parameters:kwargs (dict) – key arguments
Returns:A new operation
Return type:kser.sequencing.operation.Operation
classmethod parse_inputs(**kwargs)

Warning

Deprecated, do not use this method anymore.

Use by kser.sequencing.operation.Operation.new to check inputs

Parameters:kwargs (dict) – key arguments
Returns:parsed input
Return type:dict
set_status(status, result=None)

Update operation status

Parameters:
  • status (str) – New status
  • result (cdumay_result.Result) – Execution result
add_task(task)

Add task to operation

Parameters:task (kser.sequencing.task.Task) – task to add
prebuild(**kwargs)

To implement, perform check before the operation creation

Parameters:kwargs (dict) – key arguments
next(task)

Find the next task

Parameters:task (kser.sequencing.task.Task) – previous task
Returns:The next task
Return type:kser.sequencing.task.Task or None
launch_next(task=None, result=None)

Launch next task or finish operation

Parameters:
Returns:

Execution result

Return type:

cdumay_result.Result

launch()

Send the first task

Returns:Execution result
Return type:cdumay_result.Result
finalize()

To implement, post build actions (database mapping ect…)

Returns:Self return
Return type:kser.sequencing.operation.Operation
build_tasks(**kwargs)

Initialize tasks

Parameters:kwargs (dict) – tasks parameters (~=context)
Returns:list of tasks
Return type:list(kser.sequencing.task.Task)
compute_tasks(**kwargs)

Perfrom checks and build tasks

Returns:list of tasks
Return type:list(kser.sequencing.operation.Operation)
build(**kwargs)

Create the operation and associate tasks

Parameters:kwargs (dict) – operation data
Returns:The operation
Return type:kser.sequencing.operation.Operation
send()

To implement, send operation to Kafka

Returns:The operation
Return type:kser.sequencing.operation.Operation
class kser.sequencing.registry.OperationRegistry(app=None, controller_class=kser.controller.Controller)

A which route kser.schemas.Message from Kafka to the requested kser.sequencing.operation.Operation.

Parameters:
  • app (flask.Flask) – Flask application if any
  • controller_class (kser.controller.Controller) – Controller to use
subscribe(callback)

Register an kser.sequencing.operation.Operation into the controller. This method is a shortcut to kser.controller.Controller.register.

Parameters:callback (kser.sequencing.task.Task) – Any class which implement Task.
load_tasks()

To implement, load operation tasks

Example

The following example is based on a dice game, player roll tree time dices.

Consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 import logging
 logging.basicConfig(
     level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s"
 )

 import random

 from cdumay_result import Result
 from kser.sequencing.operation import Operation
 from kser.sequencing.registry import OperationRegistry
 from kser.sequencing.task import Task

 oregistry = OperationRegistry()


 @oregistry.subscribe
 class DiceRoll(Task):
     def run(self):
         launch = random.randint(1, 6)
         return Result(uuid=self.uuid, stdout="You made a {}".format(launch))


 @oregistry.subscribe
 class DiceLaunch(Operation):
     def build_tasks(self, **kwargs):
         return [DiceRoll(), DiceRoll(), DiceRoll()]


 if __name__ == '__main__':
     from flask import Flask
     from kser.python_kafka.consumer import Consumer

     app = Flask(__name__)
     oregistry.init_app(app)
     cons = Consumer(...)
     cons.REGISTRY = oregistry.controller
     cons.run()

Explanations:

  • line 13: We initialize the registry
  • line 16/23: We subscribe the task/operation into the registry
  • line 35-37: We start the consumer

Producer

Producer has nothing special for this feature.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 import logging
 logging.basicConfig(
     level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s"
 )

 import uuid
 from kser.python_kafka.producer import Producer
 from kser.schemas import Message

 pro = Producer(...)
 pro.send('...', Message(uuid=str(uuid.uuid4()), entrypoint="__main__.DiceRoll"))

Explanations:

  • line 10: We initialize the producer
  • line 11: We send a Message with the entrypoint __main__.DiceRoll that matches our Task registred

Execution

Producer console output:

2018-08-10 18:46:20,082 INFO     <BrokerConnection host=************/************ port=9093>: Authenticated as admin
2018-08-10 18:46:20,549 INFO     Broker version identifed as 0.10
2018-08-10 18:46:20,550 INFO     Set configuration api_version=(0, 10) to skip auto check_version requests on startup
2018-08-10 18:46:20,711 INFO     <BrokerConnection host=************/************ port=9093>: Authenticated as admin
2018-08-10 18:46:20,731 INFO     Producer.Success: __main__.DiceRoll[872c3be0-51ac-457f-a4d9-12c2f7667b80]: Message __main__.DiceRoll[872c3be0-51ac-457f-a4d9-12c2f7667b80] sent in *****
2018-08-10 18:46:20,731 INFO     Closing the Kafka producer with 999999999 secs timeout.
2018-08-10 18:46:20,741 INFO     Kafka producer closed

Consumer console output:

2018-08-10 18:44:42,355 INFO     Operation registry: loaded __main__.DiceRoll
2018-08-10 18:44:42,355 INFO     Operation registry: loaded __main__.DiceLaunch
2018-08-10 18:44:42,696 INFO     <BrokerConnection host=************/************ port=9093>: Authenticated as admin
2018-08-10 18:44:43,163 INFO     Broker version identifed as 0.10
2018-08-10 18:44:43,163 INFO     Set configuration api_version=(0, 10) to skip auto check_version requests on startup
2018-08-10 18:44:43,164 INFO     Updating subscribed topics to: ['*****']
2018-08-10 18:44:43,165 INFO     Consumer.Starting...
2018-08-10 18:44:43,182 INFO     Group coordinator for ***** is BrokerMetadata(nodeId=114251126, host='************', port=9093, rack=None)
2018-08-10 18:44:43,182 INFO     Discovered coordinator 114251126 for group *****
2018-08-10 18:44:43,182 INFO     Revoking previously assigned partitions set() for group *****
2018-08-10 18:44:43,182 INFO     (Re-)joining group *****
2018-08-10 18:44:46,230 INFO     Skipping heartbeat: no auto-assignment or waiting on rebalance
2018-08-10 18:44:46,249 INFO     Joined group '*****' (generation 2) with member_id kafka-python-1.3.1-db5cdcc7-3be9-4cf6-a4e7-06a97cc69120
2018-08-10 18:44:46,249 INFO     Elected group leader -- performing partition assignments using range
2018-08-10 18:44:46,268 INFO     Successfully joined group ***** with generation 2
2018-08-10 18:44:46,268 INFO     Updated partition assignment: [TopicPartition(topic='*****', partition=0), TopicPartition(topic='*****', partition=1), TopicPartition(topic='*****', partition=2)]
2018-08-10 18:44:46,269 INFO     Setting newly assigned partitions {TopicPartition(topic='*****', partition=0), TopicPartition(topic='*****', partition=1), TopicPartition(topic='*****', partition=2)} for group *****
2018-08-10 18:46:20,750 INFO     DiceRoll.Success: __main__.DiceRoll[872c3be0-51ac-457f-a4d9-12c2f7667b80]: You made a 1
2018-08-10 18:46:20,751 INFO     Controller.Success: __main__.DiceRoll[872c3be0-51ac-457f-a4d9-12c2f7667b80]: You made a 1

As we can see, the task __main__.DiceRoll is sent by the producer and executed by the consumer with the stdout “You made a 1”