Welcome to Karya Python Client!

Karya is an open source distributed job scheduler platform written in Kotlin.

A user can submit a plan to Karya, defining how they want to run a job, and Karya will execute the job according to the plan. This documentation covers the Karya API client, a Python library for interacting with the Karya service.

To read more about Karya, visit the Karya GitHub page

Overview

This documentation will help you get started with the Karya API client. It includes usage examples as well as detailed API docs to help you understand how to interact with the Karya platform and start scheduling jobs!

The client can be installed via pip:

pip install karya-client

The distribution files can also be found here - GitHub Releases page.

Usage Examples

These are some usage examples to help you get started with the Karya API client.

Scheduling a Recurring API Call

async samples.make_recurring_api_call.main()[source]

In this sample, we shall submit a plan with a REST API request action to make a recurring API call.

This function demonstrates how to:

  1. Create a new user using the Karya API.

  2. Submit a recurring plan that makes a REST API call.

  3. Print the resulting user and plan details to the console.

The function uses the KaryaRestClient to interact with the API and includes: - Creating a user - Defining and submitting a recurring plan with a REST API request as the action - Handling the submission and displaying the results

Scheduling a Delayed Job with Failure Hook

async samples.make_delayed_api_call_with_failure_hook.main()[source]

In this sample, we shall submit a plan with a failure hook to make a one-time API call

This function demonstrates how to use the Karya API client to:

  1. Create a new user.

  2. Submit a plan with a failure hook.

  3. Retrieve a plan’s summary.

The function uses the KaryaRestClient to interact with the API, sends requests, and prints the resulting user, plan, and summary data to the console.

Scheduling a Delayed Job with a Chained Karya Call

async samples.make_delayed_chained_karya_call.main()[source]

In this sample, we shall submit a plan with a chained action to make a recurring API call.

This function demonstrates how to:

  1. Create a new user using the Karya API.

  2. Submit a recurring plan with a chained API request.

  3. Submit a one-time plan that includes a chained API action.

The function uses the KaryaRestClient to interact with the Karya API, sends requests, and prints the resulting user and plan details to the console.

Scheduling a Recurring Job to Push a Message to Kafka

async samples.make_recurring_push_to_kafka.main()[source]

In this sample, we shall submit a plan with a recurring Kafka message action.

  1. Creating a user through the Karya API.

  2. Creating a recurring plan to send a Kafka message.

  3. Submitting the plan and printing the plan details.

This function leverages the KaryaRestClient to: - Create a user. - Define and submit a recurring plan that sends a message to a Kafka topic.

Scheduling a Delayed Job to Send an Email

async samples.make_delayed_email_request.main()[source]

In this sample, we shall submit a plan with a one-time email request.

This function demonstrates how to:

  1. Create a new user using the Karya API.

  2. Submit a one-time plan that sends an email request.

  3. Print the resulting user and plan details to the console.

The function uses the KaryaRestClient to interact with the API and handles both user creation and email action submission.

Scheduling a Recurring Job to Send a Message to Slack

async samples.make_recurring_slack_request.main()[source]

In this sample, we shall submit a plan with a recurring Slack message action.

  1. Creating a user.

  2. Creating a recurring plan that sends a Slack message every 7 seconds.

  3. Submitting the plan to the API and printing the plan details.

This function demonstrates how to: - Create a user. - Define and submit a recurring plan that sends a message to a Slack channel.

Client Module

The client module contains the clients for interracting with the Karya Server. It also has the necessary config entities, requests and response classes.

Given below are the List of Karya Python Clients implmenting different communication protocols.

class karya.clients.KaryaRestClient(config: ClientConfig)[source]

Bases: object

A client for interacting with the Karya API over REST.

This client provides methods to manage users and plans, such as creating a user, submitting a plan, fetching plans, updating plans, and getting summaries of plans.

api_version = 'v1'
async cancel_plan(plan_id: str) Plan[source]

Cancels a specified plan by ID.

Parameters:

plan_id (str) – The ID of the plan to cancel.

Returns:

A Plan object reflecting the canceled plan.

Return type:

Plan

Raises:

httpx.HTTPStatusError – If the request fails with a non-2xx status code.

async close() None[source]

Closes the HTTP client connection.

This should be called when the client is no longer needed to ensure that resources are cleaned up properly.

async create_user(request: CreateUserRequest) User[source]

Creates a new user by sending a POST request to the Karya API.

Parameters:

request (CreateUserRequest) – The request object containing user data.

Returns:

A User object containing the details of the created user.

Return type:

User

Raises:

httpx.HTTPStatusError – If the request fails with a non-2xx status code.

async get_plan(plan_id: str) GetPlanResponse[source]

Retrieves the details of a specific plan by ID.

Parameters:

plan_id (str) – The ID of the plan to retrieve.

Returns:

A response object containing the plan details.

Return type:

GetPlanResponse

Raises:

httpx.HTTPStatusError – If the request fails with a non-2xx status code.

async get_summary(plan_id: str) GetSummaryResponse[source]

Retrieves the summary for a specific plan by ID.

Parameters:

plan_id (str) – The ID of the plan for which to retrieve the summary.

Returns:

A response object containing the summary data.

Return type:

GetSummaryResponse

Raises:

httpx.HTTPStatusError – If the request fails with a non-2xx status code.

async get_user(username: str) User[source]

Retrieves the details of a specific user by username.

Parameters:

username (str) – The username of the user to retrieve.

Returns:

A User object containing the details of the requested user.

Return type:

User

Raises:

httpx.HTTPStatusError – If the request fails with a non-2xx status code.

async list_plans(user_id: str, page: int) ListPlanResponse[source]

Retrieves a list of plans for a specific user by username.

Parameters:
  • username (str) – The username of the user for which to retrieve plans.

  • page (int) – The page number to retrieve (default is 0).

Returns:

A resopnse object containing the list of plans.

Return type:

ListPlanResponse

Raises:

httpx.HTTPStatusError – If the request fails with a non-2xx status code.

async submit_plan(request: SubmitPlanRequest) Plan[source]

Submits a new plan to the Karya API.

Parameters:

request (SubmitPlanRequest) – The request object containing plan data.

Returns:

A Plan object containing the details of the submitted plan.

Return type:

Plan

Raises:

httpx.HTTPStatusError – If the request fails with a non-2xx status code.

async update_plan(request: UpdatePlanRequest) Plan[source]

Updates an existing plan with new details.

Parameters:

request (UpdatePlanRequest) – The request object containing updated plan data.

Returns:

A Plan object containing the updated plan details.

Return type:

Plan

Raises:

httpx.HTTPStatusError – If the request fails with a non-2xx status code.

Configs

The configs module contains the configuration entities used to initialize the Karya Client.

class karya.clients.config.ClientConfig(protocol: Protocol, host: str, port: int)[source]

Bases: object

A configuration class for a Karya API client.

This class holds the necessary configuration parameters to connect to the Karya API, including the communication protocol, host, and port. It also provides methods for generating the base URL and for returning a default development configuration.

protocol

The communication protocol (e.g., HTTP or HTTPS).

Type:

Protocol

host

The host or domain name of the API server.

Type:

str

port

The port number used to access the API.

Type:

int

static dev() ClientConfig[source]

Returns a default configuration for development purposes.

This method returns a configuration that uses HTTP as the protocol, “localhost” as the host, and port 8080. It is intended for use in local development environments where the API is running locally.

Returns:

A ClientConfig instance with predefined values for development.

Return type:

ClientConfig

Example

ClientConfig.dev() returns: ClientConfig(protocol=Protocol.HTTP, host=”localhost”, port=8080)

get_base_url() str[source]

Generates and returns the base URL for the API.

The base URL is constructed by combining the protocol, host, and port attributes of the ClientConfig. This is useful for constructing full URLs for API requests.

Returns:

The base URL of the Karya API in the format “<protocol>://<host>:<port>”.

Return type:

str

Example

If the configuration is:

protocol = Protocol.HTTP host = “localhost” port = 8080

The returned base URL will be “http://localhost:8080”.

host: str
port: int
protocol: Protocol

Request Entities

The requests module contains the request entities used to make requests to the Karya Server.

class karya.clients.requests.CreateUserRequest(name: str)[source]

Bases: object

Represents a request to create a new user.

name

The name of the user to be created.

Type:

str

name: str
class karya.clients.requests.SubmitPlanRequest(user_id: str, description: str, period_time: str, plan_type: ~karya.entities.abstracts.AbstractPlanType, action: ~karya.entities.abstracts.AbstractAction, hooks: ~typing.List[~karya.entities.Hook] = <factory>, max_failure_retry: int = 3)[source]

Bases: object

Represents a request to submit a plan for a user.

user_id

The ID of the user submitting the plan.

Type:

str

description

A description of the plan.

Type:

str

period_time

The time period associated with the plan.

Type:

str

plan_type

The type of plan being submitted.

Type:

AbstractPlanType

action

The action to be performed with the plan.

Type:

AbstractAction

hooks

A list of hooks to be executed when the plan is submitted (optional).

Type:

List[Hook]

max_failure_retry

The maximum number of retries in case of failure (default is 3).

Type:

int

action: AbstractAction
description: str
hooks: List[Hook]
max_failure_retry: int = 3
period_time: str
plan_type: AbstractPlanType
user_id: str
class karya.clients.requests.UpdatePlanRequest(plan_id: str, period_time: str | None, max_failure_retry: int | None, hooks: List[Hook] | None)[source]

Bases: object

Represents a request to update an existing plan.

plan_id

The ID of the plan to be updated.

Type:

str

period_time

The new time period for the plan (optional).

Type:

Optional[str]

max_failure_retry

The new maximum number of retries for the plan (optional).

Type:

Optional[int]

hooks

A new list of hooks to be executed with the plan (optional).

Type:

Optional[List[Hook]]

hooks: List[Hook] | None
max_failure_retry: int | None
period_time: str | None
plan_id: str

Response Entities

The responses module contains the response entities returned by the Karya Server.

class karya.clients.responses.GetPlanResponse(plan: Plan, latest_task: Task)[source]

Bases: object

Represents the response for retrieving a plan and its latest task.

plan

The plan associated with the response.

Type:

Plan

latest_task

The most recent task associated with the plan.

Type:

Task

latest_task: Task
plan: Plan
class karya.clients.responses.GetSummaryResponse(plan: Plan, tasks: List[Task], error_logs: List[ErrorLog])[source]

Bases: object

Represents the response for retrieving a summary of a plan, its tasks, and error logs.

plan

The plan associated with the response.

Type:

Plan

tasks

A list of tasks associated with the plan.

Type:

List[Task]

error_logs

A list of error logs related to the plan or tasks.

Type:

List[ErrorLog]

error_logs: List[ErrorLog]
plan: Plan
tasks: List[Task]
class karya.clients.responses.ListPlanResponse(plans: List[Plan], total: int, offset: int)[source]

Bases: object

Represents the response for listing plans.

plans

A list of plans.

Type:

List[Plan]

total

The total number of plans.

Type:

int

offset

The offset used for pagination.

Type:

int

offset: int
plans: List[Plan]
total: int

Entities Module

The entities module defines the core entities that are used by the system.

class karya.entities.ErrorLog(plan_id: str, error: str, type: AbstractErrorLogType, timestamp: int)[source]

Bases: object

Represents an error log associated with a plan or task.

An error log records the details of an error, including the type of error, the error message, and the associated timestamp.

plan_id

The ID of the plan that generated the error.

Type:

str

error

A description of the error.

Type:

str

type

The type of error log (either HookErrorLog or ExecutorErrorLog).

Type:

AbstractErrorLogType

timestamp

The timestamp when the error occurred.

Type:

int

class AbstractErrorLogType[source]

Bases: ABC

An abstract base class for error log types.

class ExecutorErrorLog(task_id: str)[source]

Bases: AbstractErrorLogType

Represents an error log for an executor.

This type of error log is used when there is an error related to task execution.

task_id

The ID of the task that encountered the error.

Type:

str

task_id: str
class HookErrorLog[source]

Bases: AbstractErrorLogType

Represents an error log for a hook.

This type of error log is used when there is an error in executing the hook action.

error: str
plan_id: str
timestamp: int
type: AbstractErrorLogType
class karya.entities.Hook(trigger: Trigger, action: AbstractAction, max_retry: int = 3)[source]

Bases: object

Represents a hook that triggers an action.

A hook defines a trigger and an action to be performed when the trigger condition is met.

trigger

The event or condition that triggers the hook.

Type:

Trigger

action

The action to be performed when the hook is triggered.

Type:

AbstractAction

max_retry

The maximum number of retries for the hook action in case of failure (default is 3).

Type:

int

action: AbstractAction
max_retry: int = 3
trigger: Trigger
class karya.entities.Plan(id: str, user_id: str, description: str, period_time: str, type: AbstractPlanType, status: PlanStatus, max_failure_retry: int, action: AbstractAction, hook: List[Hook], parent_plan_id: str | None, created_at: int, updated_at: int)[source]

Bases: object

Represents a plan for a user.

A plan contains metadata, actions, hooks, and other details related to a user’s scheduled activities.

id

The unique identifier for the plan.

Type:

str

user_id

The ID of the user who owns the plan.

Type:

str

description

A description of the plan.

Type:

str

period_time

The time period for which the plan is valid.

Type:

str

type

The type of the plan (e.g., Recurring, OneTime).

Type:

AbstractPlanType

status

The status of the plan (e.g., Active, Completed, Failed).

Type:

PlanStatus

max_failure_retry

The maximum number of retries allowed in case of failure.

Type:

int

action

The action associated with the plan.

Type:

AbstractAction

hook

A list of hooks associated with the plan.

Type:

List[Hook]

parent_plan_id

The ID of the parent plan if this is a sub-plan, or None if not applicable.

Type:

Optional[str]

created_at

The timestamp when the plan was created.

Type:

int

updated_at

The timestamp when the plan was last updated.

Type:

int

action: AbstractAction
created_at: int
description: str
hook: List[Hook]
id: str
max_failure_retry: int
parent_plan_id: str | None
period_time: str
status: PlanStatus
type: AbstractPlanType
updated_at: int
user_id: str
class karya.entities.Task(id: str, plan_id: str, partition_key: int, status: TaskStatus, created_at: int, executed_at: int | None, next_execution_at: int | None)[source]

Bases: object

Represents a task associated with a plan.

id

The unique identifier for the task.

Type:

str

plan_id

The ID of the plan to which the task belongs.

Type:

str

partition_key

The partition key for the task.

Type:

int

status

The status of the task (e.g., Pending, Completed, Failed).

Type:

TaskStatus

created_at

The timestamp when the task was created.

Type:

int

executed_at

The timestamp when the task was executed, or None if not executed.

Type:

Optional[int]

next_execution_at

The timestamp for the next task execution, or None if not applicable.

Type:

Optional[int]

created_at: int
executed_at: int | None
id: str
next_execution_at: int | None
partition_key: int
plan_id: str
status: TaskStatus
class karya.entities.User(id: str, name: str, created_at: int)[source]

Bases: object

Represents a user in the system.

id

The unique identifier for the user.

Type:

str

name

The name of the user.

Type:

str

created_at

The timestamp when the user was created.

Type:

int

created_at: int
id: str
name: str

Abstract Module

The abstract module contains the abstract classes whose implmentations are provided in the other classes. This is done so that it is easier to mock the classes for testing as well as make it easy to add new implementations.

class karya.entities.abstracts.AbstractAction[source]

Bases: ABC

An abstract base class representing an action.

This class serves as a base for defining different types of actions. It does not contain any specific functionality but is meant to be subclassed by more specific action types.

class karya.entities.abstracts.AbstractPlanType[source]

Bases: ABC

An abstract base class representing a plan type.

This class serves as a base for defining different types of plans. It does not contain any specific functionality but is meant to be subclassed by more specific plan types.

Enums Module

The enums module contains the enums used by the system.

class karya.entities.enums.Method(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Enum that defines the HTTP methods used for making requests.

  • GET: Retrieve data from the server.

  • POST: Send data to the server.

  • PATCH: Partially update data on the server.

  • DELETE: Remove data from the server.

DELETE = 'DELETE'
GET = 'GET'
PATCH = 'PATCH'
POST = 'POST'
class karya.entities.enums.PlanStatus(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Enum that defines the possible states a plan can be in during its lifecycle.

  • CREATED: The plan has been created but not yet started.

  • RUNNING: The plan is currently in execution.

  • COMPLETED: The plan has finished execution.

  • CANCELLED: The plan was cancelled before completion.

CANCELLED = 'CANCELLED'
COMPLETED = 'COMPLETED'
CREATED = 'CREATED'
RUNNING = 'RUNNING'
class karya.entities.enums.Protocol(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Enum that defines the possible protocols for HTTP communication.

  • HTTP: The standard HTTP protocol.

  • HTTPS: The secure version of HTTP, encrypted using SSL/TLS.

HTTP = 'HTTP'
HTTPS = 'HTTPS'
class karya.entities.enums.TaskStatus(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Enum that defines the possible states of a task during its execution within a plan.

  • CREATED: The task has been created but not yet processed.

  • PROCESSING: The task is currently being processed.

  • SUCCESS: The task was processed successfully.

  • FAILURE: The task encountered an error and failed.

  • CANCELLED: The task was cancelled before completion.

CANCELLED = 'CANCELLED'
CREATED = 'CREATED'
FAILURE = 'FAILURE'
PROCESSING = 'PROCESSING'
SUCCESS = 'SUCCESS'
class karya.entities.enums.Trigger(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Enum that defines the possible triggers for an action to occur within a plan or workflow.

  • ON_FAILURE: The action is triggered when the plan fails.

  • ON_COMPLETION: The action is triggered when the plan successfully completes.

ON_COMPLETION = 'ON_COMPLETION'
ON_FAILURE = 'ON_FAILURE'

Actions Module

The actions module contains the classes that define the actions that can be configured while forming the Plan or while configuring a Hook.

class karya.entities.actions.ChainedRequest(request: SubmitPlanRequest, type: str = 'karya.core.entities.Action.ChainedRequest')[source]

Bases: AbstractAction

Represents a chained request action.

This class represents a request that chains another request (e.g., SubmitPlanRequest).

request

The request to be chained.

Type:

SubmitPlanRequest

request: SubmitPlanRequest
type: str = 'karya.core.entities.Action.ChainedRequest'
class karya.entities.actions.EmailRequest(recipient: str, subject: str, message: str, type: str = 'karya.core.entities.Action.EmailRequest')[source]

Bases: AbstractAction

Represents an email request action.

This class defines the properties required to send an email, including the recipient, subject, and message body.

recipient

The recipient’s email address.

Type:

str

subject

The subject of the email.

Type:

str

message

The body content of the email.

Type:

str

message: str
recipient: str
subject: str
type: str = 'karya.core.entities.Action.EmailRequest'
class karya.entities.actions.KafkaProducerRequest(topic: str, message: str, key: str | None = None, type: str = 'karya.core.entities.Action.KafkaProducerRequest')[source]

Bases: AbstractAction

Represents a Kafka producer request action.

This class defines a Kafka producer request, including the topic, message, and optional key.

topic

The Kafka topic to send the message to.

Type:

str

message

The message to be sent to the Kafka topic.

Type:

str

key

An optional key to associate with the message.

Type:

Optional[str]

key: str | None = None
message: str
topic: str
type: str = 'karya.core.entities.Action.KafkaProducerRequest'
class karya.entities.actions.RestApiRequest(base_url: str, body: ~karya.entities.actions.RestApiRequest.AbstractBody = <factory>, protocol: ~karya.entities.enums.Protocol = Protocol.HTTP, method: ~karya.entities.enums.Method = Method.GET, headers: dict = <factory>, timeout: int = 2000, type: str = 'karya.core.entities.Action.RestApiRequest')[source]

Bases: AbstractAction

Represents a REST API request action.

This class allows for defining a REST API request action, including the HTTP method, protocol, headers, and request body. It includes nested classes for handling different types of request bodies, such as JSON and empty bodies.

base_url

The base URL for the REST API request.

Type:

str

body

The body of the request, which can be JSON or empty.

Type:

AbstractBody

protocol

The protocol used for the request, default is Protocol.HTTP.

Type:

Protocol

method

The HTTP method for the request, default is Method.GET.

Type:

Method

headers

The headers for the request, default is {“content-type”: “application/json”}.

Type:

dict

timeout

The timeout duration for the request in milliseconds, default is 2000.

Type:

int

class AbstractBody[source]

Bases: ABC

An abstract base class for request bodies.

class EmptyBody(type: str = 'karya.core.entities.http.Body.EmptyBody')[source]

Bases: AbstractAction

Represents an empty body for a REST API request.

type: str = 'karya.core.entities.http.Body.EmptyBody'
class JsonBody(json_string: str, type: str = 'karya.core.entities.http.Body.JsonBody')[source]

Bases: AbstractBody

Represents a JSON body for a REST API request.

json_string

The JSON-encoded string to be sent in the request body.

Type:

str

classmethod from_dict(data: Dict[str, Any]) JsonBody[source]

Creates a JsonBody instance from a dictionary by converting it to a JSON string.

Parameters:

data (Dict[str, Any]) – A dictionary representing the data to be converted.

Returns:

A new instance of JsonBody containing the JSON string.

Return type:

RestApiRequest.JsonBody

json_string: str
type: str = 'karya.core.entities.http.Body.JsonBody'
base_url: str
body: AbstractBody
headers: dict
method: Method = 'GET'
protocol: Protocol = 'HTTP'
timeout: int = 2000
type: str = 'karya.core.entities.Action.RestApiRequest'
class karya.entities.actions.SlackMessageRequest(channel: str, message: str, type: str = 'karya.core.entities.Action.SlackMessageRequest')[source]

Bases: AbstractAction

Represents a Slack message request action.

This class defines the properties required to send a message to a Slack channel.

channel

The Slack channel to send the message to.

Type:

str

message

The content of the message to be sent to the Slack channel.

Type:

str

channel: str
message: str
type: str = 'karya.core.entities.Action.SlackMessageRequest'

Plan Types Module

The plan_types module contains the classes that define the different types of plans that can be submitted to the Karya Server.

class karya.entities.plan_types.OneTime(type: str = 'karya.core.entities.PlanType.OneTime')[source]

Bases: AbstractPlanType

Represents a one-time plan type.

This class extends the AbstractPlanType and represents a plan that occurs only once.

type: str = 'karya.core.entities.PlanType.OneTime'
class karya.entities.plan_types.Recurring(end_at: int | None, type: str = 'karya.core.entities.PlanType.Recurring')[source]

Bases: AbstractPlanType

Represents a recurring plan type.

This class extends the AbstractPlanType and represents a plan that repeats periodically. It includes an optional end_at field to specify when the recurring plan should end.

end_at

The optional end date for the recurring plan (can be None if want to run plan indefinitely). Specify the end date in Unix timestamp format in epoch milliseconds.

Type:

Optional[int]

end_at: int | None
type: str = 'karya.core.entities.PlanType.Recurring'