Welcome to Karya Python Client!
Karya is an open source distributed job scheduler platform written in Kotlin.
A user can submit a plan to Karya, defining how they want to run a job, and Karya will execute the job according to the plan. This documentation covers the Karya API client, a Python library for interacting with the Karya service.
To read more about Karya, visit the Karya GitHub page
Overview
This documentation will help you get started with the Karya API client. It includes usage examples as well as detailed API docs to help you understand how to interact with the Karya platform and start scheduling jobs!
The client can be installed via pip:
pip install karya-client
The distribution files can also be found here - GitHub Releases page.
Usage Examples
These are some usage examples to help you get started with the Karya API client.
Scheduling a Recurring API Call
- async samples.make_recurring_api_call.main()[source]
In this sample, we shall submit a plan with a REST API request action to make a recurring API call.
This function demonstrates how to:
Create a new user using the Karya API.
Submit a recurring plan that makes a REST API call.
Print the resulting user and plan details to the console.
The function uses the KaryaRestClient to interact with the API and includes: - Creating a user - Defining and submitting a recurring plan with a REST API request as the action - Handling the submission and displaying the results
Scheduling a Delayed Job with Failure Hook
- async samples.make_delayed_api_call_with_failure_hook.main()[source]
In this sample, we shall submit a plan with a failure hook to make a one-time API call
This function demonstrates how to use the Karya API client to:
Create a new user.
Submit a plan with a failure hook.
Retrieve a plan’s summary.
The function uses the KaryaRestClient to interact with the API, sends requests, and prints the resulting user, plan, and summary data to the console.
Scheduling a Delayed Job with a Chained Karya Call
- async samples.make_delayed_chained_karya_call.main()[source]
In this sample, we shall submit a plan with a chained action to make a recurring API call.
This function demonstrates how to:
Create a new user using the Karya API.
Submit a recurring plan with a chained API request.
Submit a one-time plan that includes a chained API action.
The function uses the KaryaRestClient to interact with the Karya API, sends requests, and prints the resulting user and plan details to the console.
Scheduling a Recurring Job to Push a Message to Kafka
- async samples.make_recurring_push_to_kafka.main()[source]
In this sample, we shall submit a plan with a recurring Kafka message action.
Creating a user through the Karya API.
Creating a recurring plan to send a Kafka message.
Submitting the plan and printing the plan details.
This function leverages the KaryaRestClient to: - Create a user. - Define and submit a recurring plan that sends a message to a Kafka topic.
Scheduling a Delayed Job to Send an Email
- async samples.make_delayed_email_request.main()[source]
In this sample, we shall submit a plan with a one-time email request.
This function demonstrates how to:
Create a new user using the Karya API.
Submit a one-time plan that sends an email request.
Print the resulting user and plan details to the console.
The function uses the KaryaRestClient to interact with the API and handles both user creation and email action submission.
Scheduling a Recurring Job to Send a Message to Slack
- async samples.make_recurring_slack_request.main()[source]
In this sample, we shall submit a plan with a recurring Slack message action.
Creating a user.
Creating a recurring plan that sends a Slack message every 7 seconds.
Submitting the plan to the API and printing the plan details.
This function demonstrates how to: - Create a user. - Define and submit a recurring plan that sends a message to a Slack channel.
Client Module
The client module contains the clients for interracting with the Karya Server. It also has the necessary config entities, requests and response classes.
Given below are the List of Karya Python Clients implmenting different communication protocols.
- class karya.clients.KaryaRestClient(config: ClientConfig)[source]
Bases:
object
A client for interacting with the Karya API over REST.
This client provides methods to manage users and plans, such as creating a user, submitting a plan, fetching plans, updating plans, and getting summaries of plans.
- api_version = 'v1'
- async cancel_plan(plan_id: str) Plan [source]
Cancels a specified plan by ID.
- Parameters:
plan_id (str) – The ID of the plan to cancel.
- Returns:
A Plan object reflecting the canceled plan.
- Return type:
- Raises:
httpx.HTTPStatusError – If the request fails with a non-2xx status code.
- async close() None [source]
Closes the HTTP client connection.
This should be called when the client is no longer needed to ensure that resources are cleaned up properly.
- async create_user(request: CreateUserRequest) User [source]
Creates a new user by sending a POST request to the Karya API.
- Parameters:
request (CreateUserRequest) – The request object containing user data.
- Returns:
A User object containing the details of the created user.
- Return type:
- Raises:
httpx.HTTPStatusError – If the request fails with a non-2xx status code.
- async get_plan(plan_id: str) GetPlanResponse [source]
Retrieves the details of a specific plan by ID.
- Parameters:
plan_id (str) – The ID of the plan to retrieve.
- Returns:
A response object containing the plan details.
- Return type:
- Raises:
httpx.HTTPStatusError – If the request fails with a non-2xx status code.
- async get_summary(plan_id: str) GetSummaryResponse [source]
Retrieves the summary for a specific plan by ID.
- Parameters:
plan_id (str) – The ID of the plan for which to retrieve the summary.
- Returns:
A response object containing the summary data.
- Return type:
- Raises:
httpx.HTTPStatusError – If the request fails with a non-2xx status code.
- async get_user(username: str) User [source]
Retrieves the details of a specific user by username.
- Parameters:
username (str) – The username of the user to retrieve.
- Returns:
A User object containing the details of the requested user.
- Return type:
- Raises:
httpx.HTTPStatusError – If the request fails with a non-2xx status code.
- async list_plans(user_id: str, page: int) ListPlanResponse [source]
Retrieves a list of plans for a specific user by username.
- Parameters:
username (str) – The username of the user for which to retrieve plans.
page (int) – The page number to retrieve (default is 0).
- Returns:
A resopnse object containing the list of plans.
- Return type:
- Raises:
httpx.HTTPStatusError – If the request fails with a non-2xx status code.
- async submit_plan(request: SubmitPlanRequest) Plan [source]
Submits a new plan to the Karya API.
- Parameters:
request (SubmitPlanRequest) – The request object containing plan data.
- Returns:
A Plan object containing the details of the submitted plan.
- Return type:
- Raises:
httpx.HTTPStatusError – If the request fails with a non-2xx status code.
- async update_plan(request: UpdatePlanRequest) Plan [source]
Updates an existing plan with new details.
- Parameters:
request (UpdatePlanRequest) – The request object containing updated plan data.
- Returns:
A Plan object containing the updated plan details.
- Return type:
- Raises:
httpx.HTTPStatusError – If the request fails with a non-2xx status code.
Configs
The configs module contains the configuration entities used to initialize the Karya Client.
- class karya.clients.config.ClientConfig(protocol: Protocol, host: str, port: int)[source]
Bases:
object
A configuration class for a Karya API client.
This class holds the necessary configuration parameters to connect to the Karya API, including the communication protocol, host, and port. It also provides methods for generating the base URL and for returning a default development configuration.
- host
The host or domain name of the API server.
- Type:
str
- port
The port number used to access the API.
- Type:
int
- static dev() ClientConfig [source]
Returns a default configuration for development purposes.
This method returns a configuration that uses HTTP as the protocol, “localhost” as the host, and port 8080. It is intended for use in local development environments where the API is running locally.
- Returns:
A ClientConfig instance with predefined values for development.
- Return type:
Example
ClientConfig.dev() returns: ClientConfig(protocol=Protocol.HTTP, host=”localhost”, port=8080)
- get_base_url() str [source]
Generates and returns the base URL for the API.
The base URL is constructed by combining the protocol, host, and port attributes of the ClientConfig. This is useful for constructing full URLs for API requests.
- Returns:
The base URL of the Karya API in the format “<protocol>://<host>:<port>”.
- Return type:
str
Example
- If the configuration is:
protocol = Protocol.HTTP host = “localhost” port = 8080
The returned base URL will be “http://localhost:8080”.
- host: str
- port: int
Request Entities
The requests module contains the request entities used to make requests to the Karya Server.
- class karya.clients.requests.CreateUserRequest(name: str)[source]
Bases:
object
Represents a request to create a new user.
- name
The name of the user to be created.
- Type:
str
- name: str
- class karya.clients.requests.SubmitPlanRequest(user_id: str, description: str, period_time: str, plan_type: ~karya.entities.abstracts.AbstractPlanType, action: ~karya.entities.abstracts.AbstractAction, hooks: ~typing.List[~karya.entities.Hook] = <factory>, max_failure_retry: int = 3)[source]
Bases:
object
Represents a request to submit a plan for a user.
- user_id
The ID of the user submitting the plan.
- Type:
str
- description
A description of the plan.
- Type:
str
- period_time
The time period associated with the plan.
- Type:
str
- plan_type
The type of plan being submitted.
- Type:
- action
The action to be performed with the plan.
- Type:
- max_failure_retry
The maximum number of retries in case of failure (default is 3).
- Type:
int
- action: AbstractAction
- description: str
- max_failure_retry: int = 3
- period_time: str
- plan_type: AbstractPlanType
- user_id: str
- class karya.clients.requests.UpdatePlanRequest(plan_id: str, period_time: str | None, max_failure_retry: int | None, hooks: List[Hook] | None)[source]
Bases:
object
Represents a request to update an existing plan.
- plan_id
The ID of the plan to be updated.
- Type:
str
- period_time
The new time period for the plan (optional).
- Type:
Optional[str]
- max_failure_retry
The new maximum number of retries for the plan (optional).
- Type:
Optional[int]
- max_failure_retry: int | None
- period_time: str | None
- plan_id: str
Response Entities
The responses module contains the response entities returned by the Karya Server.
- class karya.clients.responses.GetPlanResponse(plan: Plan, latest_task: Task)[source]
Bases:
object
Represents the response for retrieving a plan and its latest task.
- class karya.clients.responses.GetSummaryResponse(plan: Plan, tasks: List[Task], error_logs: List[ErrorLog])[source]
Bases:
object
Represents the response for retrieving a summary of a plan, its tasks, and error logs.
Entities Module
The entities module defines the core entities that are used by the system.
- class karya.entities.ErrorLog(plan_id: str, error: str, type: AbstractErrorLogType, timestamp: int)[source]
Bases:
object
Represents an error log associated with a plan or task.
An error log records the details of an error, including the type of error, the error message, and the associated timestamp.
- plan_id
The ID of the plan that generated the error.
- Type:
str
- error
A description of the error.
- Type:
str
- type
The type of error log (either HookErrorLog or ExecutorErrorLog).
- Type:
- timestamp
The timestamp when the error occurred.
- Type:
int
- class ExecutorErrorLog(task_id: str)[source]
Bases:
AbstractErrorLogType
Represents an error log for an executor.
This type of error log is used when there is an error related to task execution.
- task_id
The ID of the task that encountered the error.
- Type:
str
- task_id: str
- class HookErrorLog[source]
Bases:
AbstractErrorLogType
Represents an error log for a hook.
This type of error log is used when there is an error in executing the hook action.
- error: str
- plan_id: str
- timestamp: int
- type: AbstractErrorLogType
- class karya.entities.Hook(trigger: Trigger, action: AbstractAction, max_retry: int = 3)[source]
Bases:
object
Represents a hook that triggers an action.
A hook defines a trigger and an action to be performed when the trigger condition is met.
- action
The action to be performed when the hook is triggered.
- Type:
- max_retry
The maximum number of retries for the hook action in case of failure (default is 3).
- Type:
int
- action: AbstractAction
- max_retry: int = 3
- class karya.entities.Plan(id: str, user_id: str, description: str, period_time: str, type: AbstractPlanType, status: PlanStatus, max_failure_retry: int, action: AbstractAction, hook: List[Hook], parent_plan_id: str | None, created_at: int, updated_at: int)[source]
Bases:
object
Represents a plan for a user.
A plan contains metadata, actions, hooks, and other details related to a user’s scheduled activities.
- id
The unique identifier for the plan.
- Type:
str
- user_id
The ID of the user who owns the plan.
- Type:
str
- description
A description of the plan.
- Type:
str
- period_time
The time period for which the plan is valid.
- Type:
str
- type
The type of the plan (e.g., Recurring, OneTime).
- Type:
- status
The status of the plan (e.g., Active, Completed, Failed).
- Type:
- max_failure_retry
The maximum number of retries allowed in case of failure.
- Type:
int
- action
The action associated with the plan.
- Type:
- parent_plan_id
The ID of the parent plan if this is a sub-plan, or None if not applicable.
- Type:
Optional[str]
- created_at
The timestamp when the plan was created.
- Type:
int
- updated_at
The timestamp when the plan was last updated.
- Type:
int
- action: AbstractAction
- created_at: int
- description: str
- id: str
- max_failure_retry: int
- parent_plan_id: str | None
- period_time: str
- status: PlanStatus
- type: AbstractPlanType
- updated_at: int
- user_id: str
- class karya.entities.Task(id: str, plan_id: str, partition_key: int, status: TaskStatus, created_at: int, executed_at: int | None, next_execution_at: int | None)[source]
Bases:
object
Represents a task associated with a plan.
- id
The unique identifier for the task.
- Type:
str
- plan_id
The ID of the plan to which the task belongs.
- Type:
str
- partition_key
The partition key for the task.
- Type:
int
- status
The status of the task (e.g., Pending, Completed, Failed).
- Type:
- created_at
The timestamp when the task was created.
- Type:
int
- executed_at
The timestamp when the task was executed, or None if not executed.
- Type:
Optional[int]
- next_execution_at
The timestamp for the next task execution, or None if not applicable.
- Type:
Optional[int]
- created_at: int
- executed_at: int | None
- id: str
- next_execution_at: int | None
- partition_key: int
- plan_id: str
- status: TaskStatus
- class karya.entities.User(id: str, name: str, created_at: int)[source]
Bases:
object
Represents a user in the system.
- id
The unique identifier for the user.
- Type:
str
- name
The name of the user.
- Type:
str
- created_at
The timestamp when the user was created.
- Type:
int
- created_at: int
- id: str
- name: str
Abstract Module
The abstract module contains the abstract classes whose implmentations are provided in the other classes. This is done so that it is easier to mock the classes for testing as well as make it easy to add new implementations.
Enums Module
The enums module contains the enums used by the system.
- class karya.entities.enums.Method(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
str
,Enum
Enum that defines the HTTP methods used for making requests.
GET: Retrieve data from the server.
POST: Send data to the server.
PATCH: Partially update data on the server.
DELETE: Remove data from the server.
- DELETE = 'DELETE'
- GET = 'GET'
- PATCH = 'PATCH'
- POST = 'POST'
- class karya.entities.enums.PlanStatus(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
Enum
Enum that defines the possible states a plan can be in during its lifecycle.
CREATED: The plan has been created but not yet started.
RUNNING: The plan is currently in execution.
COMPLETED: The plan has finished execution.
CANCELLED: The plan was cancelled before completion.
- CANCELLED = 'CANCELLED'
- COMPLETED = 'COMPLETED'
- CREATED = 'CREATED'
- RUNNING = 'RUNNING'
- class karya.entities.enums.Protocol(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
str
,Enum
Enum that defines the possible protocols for HTTP communication.
HTTP: The standard HTTP protocol.
HTTPS: The secure version of HTTP, encrypted using SSL/TLS.
- HTTP = 'HTTP'
- HTTPS = 'HTTPS'
- class karya.entities.enums.TaskStatus(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
Enum
Enum that defines the possible states of a task during its execution within a plan.
CREATED: The task has been created but not yet processed.
PROCESSING: The task is currently being processed.
SUCCESS: The task was processed successfully.
FAILURE: The task encountered an error and failed.
CANCELLED: The task was cancelled before completion.
- CANCELLED = 'CANCELLED'
- CREATED = 'CREATED'
- FAILURE = 'FAILURE'
- PROCESSING = 'PROCESSING'
- SUCCESS = 'SUCCESS'
- class karya.entities.enums.Trigger(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
str
,Enum
Enum that defines the possible triggers for an action to occur within a plan or workflow.
ON_FAILURE: The action is triggered when the plan fails.
ON_COMPLETION: The action is triggered when the plan successfully completes.
- ON_COMPLETION = 'ON_COMPLETION'
- ON_FAILURE = 'ON_FAILURE'
Actions Module
The actions module contains the classes that define the actions that can be configured while forming the Plan or while configuring a Hook.
- class karya.entities.actions.ChainedRequest(request: SubmitPlanRequest, type: str = 'karya.core.entities.Action.ChainedRequest')[source]
Bases:
AbstractAction
Represents a chained request action.
This class represents a request that chains another request (e.g., SubmitPlanRequest).
- request
The request to be chained.
- Type:
- request: SubmitPlanRequest
- type: str = 'karya.core.entities.Action.ChainedRequest'
- class karya.entities.actions.EmailRequest(recipient: str, subject: str, message: str, type: str = 'karya.core.entities.Action.EmailRequest')[source]
Bases:
AbstractAction
Represents an email request action.
This class defines the properties required to send an email, including the recipient, subject, and message body.
- recipient
The recipient’s email address.
- Type:
str
- subject
The subject of the email.
- Type:
str
- message
The body content of the email.
- Type:
str
- message: str
- recipient: str
- subject: str
- type: str = 'karya.core.entities.Action.EmailRequest'
- class karya.entities.actions.KafkaProducerRequest(topic: str, message: str, key: str | None = None, type: str = 'karya.core.entities.Action.KafkaProducerRequest')[source]
Bases:
AbstractAction
Represents a Kafka producer request action.
This class defines a Kafka producer request, including the topic, message, and optional key.
- topic
The Kafka topic to send the message to.
- Type:
str
- message
The message to be sent to the Kafka topic.
- Type:
str
- key
An optional key to associate with the message.
- Type:
Optional[str]
- key: str | None = None
- message: str
- topic: str
- type: str = 'karya.core.entities.Action.KafkaProducerRequest'
- class karya.entities.actions.RestApiRequest(base_url: str, body: ~karya.entities.actions.RestApiRequest.AbstractBody = <factory>, protocol: ~karya.entities.enums.Protocol = Protocol.HTTP, method: ~karya.entities.enums.Method = Method.GET, headers: dict = <factory>, timeout: int = 2000, type: str = 'karya.core.entities.Action.RestApiRequest')[source]
Bases:
AbstractAction
Represents a REST API request action.
This class allows for defining a REST API request action, including the HTTP method, protocol, headers, and request body. It includes nested classes for handling different types of request bodies, such as JSON and empty bodies.
- base_url
The base URL for the REST API request.
- Type:
str
- body
The body of the request, which can be JSON or empty.
- Type:
- headers
The headers for the request, default is {“content-type”: “application/json”}.
- Type:
dict
- timeout
The timeout duration for the request in milliseconds, default is 2000.
- Type:
int
- class EmptyBody(type: str = 'karya.core.entities.http.Body.EmptyBody')[source]
Bases:
AbstractAction
Represents an empty body for a REST API request.
- type: str = 'karya.core.entities.http.Body.EmptyBody'
- class JsonBody(json_string: str, type: str = 'karya.core.entities.http.Body.JsonBody')[source]
Bases:
AbstractBody
Represents a JSON body for a REST API request.
- json_string
The JSON-encoded string to be sent in the request body.
- Type:
str
- classmethod from_dict(data: Dict[str, Any]) JsonBody [source]
Creates a JsonBody instance from a dictionary by converting it to a JSON string.
- Parameters:
data (Dict[str, Any]) – A dictionary representing the data to be converted.
- Returns:
A new instance of JsonBody containing the JSON string.
- Return type:
- json_string: str
- type: str = 'karya.core.entities.http.Body.JsonBody'
- base_url: str
- body: AbstractBody
- headers: dict
- timeout: int = 2000
- type: str = 'karya.core.entities.Action.RestApiRequest'
- class karya.entities.actions.SlackMessageRequest(channel: str, message: str, type: str = 'karya.core.entities.Action.SlackMessageRequest')[source]
Bases:
AbstractAction
Represents a Slack message request action.
This class defines the properties required to send a message to a Slack channel.
- channel
The Slack channel to send the message to.
- Type:
str
- message
The content of the message to be sent to the Slack channel.
- Type:
str
- channel: str
- message: str
- type: str = 'karya.core.entities.Action.SlackMessageRequest'
Plan Types Module
The plan_types module contains the classes that define the different types of plans that can be submitted to the Karya Server.
- class karya.entities.plan_types.OneTime(type: str = 'karya.core.entities.PlanType.OneTime')[source]
Bases:
AbstractPlanType
Represents a one-time plan type.
This class extends the AbstractPlanType and represents a plan that occurs only once.
- type: str = 'karya.core.entities.PlanType.OneTime'
- class karya.entities.plan_types.Recurring(end_at: int | None, type: str = 'karya.core.entities.PlanType.Recurring')[source]
Bases:
AbstractPlanType
Represents a recurring plan type.
This class extends the AbstractPlanType and represents a plan that repeats periodically. It includes an optional end_at field to specify when the recurring plan should end.
- end_at
The optional end date for the recurring plan (can be None if want to run plan indefinitely). Specify the end date in Unix timestamp format in epoch milliseconds.
- Type:
Optional[int]
- end_at: int | None
- type: str = 'karya.core.entities.PlanType.Recurring'