Operation and task¶
Kser provide a way to create and launch operations. An operation is a list of task executed linearly.
Note
Each operation’s task may be executed on different consumer.
API focus¶
-
class
kser.sequencing.task.
Task
(uuid=None, params=None, status="PENDING", result=None, metadata=None)¶ A task is a
kser.entry.Entrypoint
with additional attributes. To do this, use a database as shared backend.Parameters: - uuid (str) – task unique identifier
- params (dict) – task parameter
- status (str) – task status
- result (cdumay_result.Result) – forwarded result from a previous task
- metadata (dict) – task context
-
log
(message, level=logging.INFO, *args, **kwargs)¶ Send log entry, prefixing message using the following format:
{TaskName}.{TaskStatus}: {EntryPointPath}[{TaskUUID}]:
Parameters: - message (str) – log message
- level (int) – Logging level
- args (list) – log record arguments
- kwargs (dict) – log record key arguments
-
class
kser.sequencing.operation.
Operation
(uuid=None, params=None, status="PENDING", result=None, metadata=None)¶ In fact it’s a
kser.sequencing.task.Task
which has other task as child.-
classmethod
new
(**kwargs)¶ Warning
Deprecated, do not use this method anymore.
Initialize the operation using a
dict
.Parameters: kwargs (dict) – key arguments Returns: A new operation Return type: kser.sequencing.operation.Operation
-
classmethod
parse_inputs
(**kwargs)¶ Warning
Deprecated, do not use this method anymore.
Use by
kser.sequencing.operation.Operation.new
to check inputsParameters: kwargs (dict) – key arguments Returns: parsed input Return type: dict
-
set_status
(status, result=None)¶ Update operation status
Parameters: - status (str) – New status
- result (cdumay_result.Result) – Execution result
-
add_task
(task)¶ Add task to operation
Parameters: task (kser.sequencing.task.Task) – task to add
-
prebuild
(**kwargs)¶ To implement, perform check before the operation creation
Parameters: kwargs (dict) – key arguments
-
next
(task)¶ Find the next task
Parameters: task (kser.sequencing.task.Task) – previous task Returns: The next task Return type: kser.sequencing.task.Task or None
-
launch_next
(task=None, result=None)¶ Launch next task or finish operation
Parameters: - task (kser.sequencing.task.Task) – previous task
- result (cdumay_result.Result) – previous task result
Returns: Execution result
Return type: cdumay_result.Result
-
launch
()¶ Send the first task
Returns: Execution result Return type: cdumay_result.Result
-
finalize
()¶ To implement, post build actions (database mapping ect…)
Returns: Self return Return type: kser.sequencing.operation.Operation
-
build_tasks
(**kwargs)¶ Initialize tasks
Parameters: kwargs (dict) – tasks parameters (~=context) Returns: list of tasks Return type: list(kser.sequencing.task.Task)
-
compute_tasks
(**kwargs)¶ Perfrom checks and build tasks
Returns: list of tasks Return type: list(kser.sequencing.operation.Operation)
-
build
(**kwargs)¶ Create the operation and associate tasks
Parameters: kwargs (dict) – operation data Returns: The operation Return type: kser.sequencing.operation.Operation
-
send
()¶ To implement, send operation to Kafka
Returns: The operation Return type: kser.sequencing.operation.Operation
-
classmethod
-
class
kser.sequencing.registry.
OperationRegistry
(app=None, controller_class=kser.controller.Controller)¶ A which route
kser.schemas.Message
from Kafka to the requestedkser.sequencing.operation.Operation
.Parameters: - app (flask.Flask) – Flask application if any
- controller_class (kser.controller.Controller) – Controller to use
-
subscribe
(callback)¶ Register an
kser.sequencing.operation.Operation
into the controller. This method is a shortcut tokser.controller.Controller.register
.Parameters: callback (kser.sequencing.task.Task) – Any class which implement Task.
-
load_tasks
()¶ To implement, load operation tasks
Example¶
The following example is based on a dice game, player roll tree time dices.
Consumer¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | import logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s"
)
import random
from cdumay_result import Result
from kser.sequencing.operation import Operation
from kser.sequencing.registry import OperationRegistry
from kser.sequencing.task import Task
oregistry = OperationRegistry()
@oregistry.subscribe
class DiceRoll(Task):
def run(self):
launch = random.randint(1, 6)
return Result(uuid=self.uuid, stdout="You made a {}".format(launch))
@oregistry.subscribe
class DiceLaunch(Operation):
def build_tasks(self, **kwargs):
return [DiceRoll(), DiceRoll(), DiceRoll()]
if __name__ == '__main__':
from flask import Flask
from kser.python_kafka.consumer import Consumer
app = Flask(__name__)
oregistry.init_app(app)
cons = Consumer(...)
cons.REGISTRY = oregistry.controller
cons.run()
|
Explanations:
- line 13: We initialize the registry
- line 16/23: We subscribe the task/operation into the registry
- line 35-37: We start the consumer
Producer¶
Producer has nothing special for this feature.
1 2 3 4 5 6 7 8 9 10 11 | import logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s"
)
import uuid
from kser.python_kafka.producer import Producer
from kser.schemas import Message
pro = Producer(...)
pro.send('...', Message(uuid=str(uuid.uuid4()), entrypoint="__main__.DiceRoll"))
|
Explanations:
- line 10: We initialize the producer
- line 11: We send a Message with the entrypoint __main__.DiceRoll that matches our Task registred
Execution¶
Producer console output:
2018-08-10 18:46:20,082 INFO <BrokerConnection host=************/************ port=9093>: Authenticated as admin
2018-08-10 18:46:20,549 INFO Broker version identifed as 0.10
2018-08-10 18:46:20,550 INFO Set configuration api_version=(0, 10) to skip auto check_version requests on startup
2018-08-10 18:46:20,711 INFO <BrokerConnection host=************/************ port=9093>: Authenticated as admin
2018-08-10 18:46:20,731 INFO Producer.Success: __main__.DiceRoll[872c3be0-51ac-457f-a4d9-12c2f7667b80]: Message __main__.DiceRoll[872c3be0-51ac-457f-a4d9-12c2f7667b80] sent in *****
2018-08-10 18:46:20,731 INFO Closing the Kafka producer with 999999999 secs timeout.
2018-08-10 18:46:20,741 INFO Kafka producer closed
Consumer console output:
2018-08-10 18:44:42,355 INFO Operation registry: loaded __main__.DiceRoll
2018-08-10 18:44:42,355 INFO Operation registry: loaded __main__.DiceLaunch
2018-08-10 18:44:42,696 INFO <BrokerConnection host=************/************ port=9093>: Authenticated as admin
2018-08-10 18:44:43,163 INFO Broker version identifed as 0.10
2018-08-10 18:44:43,163 INFO Set configuration api_version=(0, 10) to skip auto check_version requests on startup
2018-08-10 18:44:43,164 INFO Updating subscribed topics to: ['*****']
2018-08-10 18:44:43,165 INFO Consumer.Starting...
2018-08-10 18:44:43,182 INFO Group coordinator for ***** is BrokerMetadata(nodeId=114251126, host='************', port=9093, rack=None)
2018-08-10 18:44:43,182 INFO Discovered coordinator 114251126 for group *****
2018-08-10 18:44:43,182 INFO Revoking previously assigned partitions set() for group *****
2018-08-10 18:44:43,182 INFO (Re-)joining group *****
2018-08-10 18:44:46,230 INFO Skipping heartbeat: no auto-assignment or waiting on rebalance
2018-08-10 18:44:46,249 INFO Joined group '*****' (generation 2) with member_id kafka-python-1.3.1-db5cdcc7-3be9-4cf6-a4e7-06a97cc69120
2018-08-10 18:44:46,249 INFO Elected group leader -- performing partition assignments using range
2018-08-10 18:44:46,268 INFO Successfully joined group ***** with generation 2
2018-08-10 18:44:46,268 INFO Updated partition assignment: [TopicPartition(topic='*****', partition=0), TopicPartition(topic='*****', partition=1), TopicPartition(topic='*****', partition=2)]
2018-08-10 18:44:46,269 INFO Setting newly assigned partitions {TopicPartition(topic='*****', partition=0), TopicPartition(topic='*****', partition=1), TopicPartition(topic='*****', partition=2)} for group *****
2018-08-10 18:46:20,750 INFO DiceRoll.Success: __main__.DiceRoll[872c3be0-51ac-457f-a4d9-12c2f7667b80]: You made a 1
2018-08-10 18:46:20,751 INFO Controller.Success: __main__.DiceRoll[872c3be0-51ac-457f-a4d9-12c2f7667b80]: You made a 1
As we can see, the task __main__.DiceRoll is sent by the producer and executed by the consumer with the stdout “You made a 1”