Skip to content

ppqueue.Queue

A parallel processing job runner / data structure.

PARAMETER DESCRIPTION
max_concurrent

max number of concurrently running jobs.

TYPE: int DEFAULT: cpu_count()

max_size

max size of the queue (default=0, unlimited).

TYPE: int DEFAULT: 0

engine

the engine used to run jobs.

TYPE: str | type[Process] | type[Thread] DEFAULT: Process

name

an identifier for this queue.

TYPE: str | None DEFAULT: None

callback

a callable that is called immediately after each job is finished.

TYPE: Callable[[Job], Any] | None DEFAULT: None

show_progress

global setting for showing progress bars.

TYPE: bool DEFAULT: False

drop_finished

if True, the queue will not store finished jobs for retrieval.

TYPE: bool DEFAULT: False

stop_when_idle

if True, the queue will stop the pulse when all jobs are finished.

TYPE: bool DEFAULT: False

pulse_freq_ms

the interval at which jobs are transitioned between internal queues.

TYPE: int DEFAULT: 100

no_start

if True, do not start the queue pulse on instantiation.

TYPE: bool DEFAULT: False

Examples:

>>> from ppqueue import Queue
>>> from time import sleep
...
>>> with Queue() as queue:
...     jobs = queue.map(sleep, [1, 2, 3, 4, 5])
...
>>> len(jobs)
5

Functions

collect

collect(n: int = 0, wait: bool = False, **kwargs: Any) -> list[Job]

Removes and returns all finished jobs from the queue.

PARAMETER DESCRIPTION
n

collect this many jobs (default=0, all)

TYPE: int DEFAULT: 0

wait

If True, block until this many jobs are finished. Else, immediately return all finished.

TYPE: bool DEFAULT: False

**kwargs

kwargs given to Queue.wait.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[Job]

a list of Job instances.

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     for i in range(5):
...         _ = queue.enqueue(add_nums, args=[i, 100])
...
...     jobs = queue.collect(wait=True)
...
>>> type(jobs)
<class 'list'>
>>> len(jobs)
5

dequeue

dequeue(*, wait: bool = False, _peek: bool = False, **kwargs: Any) -> Job | None

Removes and returns the finished job with the highest priority from the queue.

PARAMETER DESCRIPTION
wait

if no jobs are finished, wait for one.

TYPE: bool DEFAULT: False

**kwargs

passed to Queue.wait

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Job | None

...

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     for i in range(5):
...         _ = queue.enqueue(add_nums, args=[i, 100])
...
...     jobs = [queue.dequeue(wait=True) for _ in range(queue.size())]
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]

dispose

dispose() -> None

Stop running jobs, then clear the queue, then stop the queue pulse.

enqueue

enqueue(fun: Callable[..., Any], /, args: Sequence[Any] | None = None, kwargs: dict[str, Any] | None = None, name: str | None = None, priority: int = 100, group: int | None = None, timeout: float = 0, suppress_errors: bool = False, skip_on_group_error: bool = False) -> int

Adds a job to the queue.

PARAMETER DESCRIPTION
fun

...

TYPE: Callable[..., Any]

args

...

TYPE: Sequence[Any] | None DEFAULT: None

kwargs

...

TYPE: dict[str, Any] | None DEFAULT: None

name

...

TYPE: str | None DEFAULT: None

priority

...

TYPE: int DEFAULT: 100

group

...

TYPE: int | None DEFAULT: None

timeout

...

TYPE: float DEFAULT: 0

suppress_errors

...

TYPE: bool DEFAULT: False

skip_on_group_error

...

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
int

...

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     for i in range(5):
...         _ = queue.enqueue(add_nums, args=[i, 100])
...
...     jobs = queue.collect(wait=True)
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]

is_busy

is_busy() -> bool

True if max concurrent limit is reached or if there are waiting jobs.

RETURNS DESCRIPTION
bool

...

is_empty

is_empty() -> bool

True if there are no jobs in the queue system.

RETURNS DESCRIPTION
bool

...

is_full

is_full() -> bool

True if the number of jobs in the queue system is equal to max_size.

RETURNS DESCRIPTION
bool

...

map

map(fun: Callable[..., Any], iterable: Sequence[Any], /, *args: Any, timeout: float = 0, show_progress: bool | None = None, **kwargs: Any) -> list[Job]

Submits many jobs to the queue -- one for each item in the iterable. Waits for all to finish, then returns the results.

PARAMETER DESCRIPTION
fun

...

TYPE: Callable[..., Any]

iterable

...

TYPE: Sequence[Any]

*args

...

TYPE: Any DEFAULT: ()

timeout

...

TYPE: float DEFAULT: 0

show_progress

...

TYPE: bool | None DEFAULT: None

**kwargs

...

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[Job]

...

Examples:

>>> from ppqueue import Queue
>>> from time import sleep
...
>>> with Queue() as queue:
...     jobs = queue.map(sleep, [1, 2, 3, 4, 5])
...
>>> len(jobs)
5

peek

peek(*args: Any, **kwargs: Any) -> Job | None

Returns the job with the highest priority from the queue.

Similar to dequeue / pop, but the job remains in the queue.

PARAMETER DESCRIPTION
*args

...

TYPE: Any DEFAULT: ()

**kwargs

...

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Job | None

...

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     for i in range(5):
...         _ = queue.put(add_nums, args=[i, 100])
...
...     print('Before:', queue.size())
...
...     job = queue.peek(wait=True)
...
...     print('After:', queue.size())
...
Before: 5
After: 5
>>> job.result
100

pop

pop(*args: Any, **kwargs: Any) -> Job | None

Alias for dequeue.

PARAMETER DESCRIPTION
*args

...

TYPE: Any DEFAULT: ()

**kwargs

...

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Job | None

...

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     for i in range(5):
...         _ = queue.put(add_nums, args=[i, 100])
...
...     jobs = [queue.pop(wait=True) for _ in range(queue.size())]
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]

put

put(*args, **kwargs) -> int

Alias for enqueue.

Adds a job to the queue.

PARAMETER DESCRIPTION
*args

...

DEFAULT: ()

**kwargs

...

DEFAULT: {}

RETURNS DESCRIPTION
int

...

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     for i in range(5):
...         _ = queue.put(add_nums, args=[i, 100])
...     jobs = queue.collect(wait=True)
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]

size

size(*, waiting: bool = False, working: bool = False, finished: bool = False) -> int

Get the number of jobs in the queue in state: waiting, working, and/or finished.

If no options are given, the total number of jobs in the queue is returned.

PARAMETER DESCRIPTION
waiting

include waiting jobs.

TYPE: bool DEFAULT: False

working

include working jobs.

TYPE: bool DEFAULT: False

finished

include finished jobs.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
int

...

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     for i in range(5):
...         _ = queue.enqueue(add_nums, args=[i, 100])
...         print(queue.size())
1
2
3
4
5

starmap

starmap(fun: Callable[..., Any], iterable: Sequence[Sequence[Any]], /, *args: Any, timeout: float = 0, show_progress: bool | None = None, **kwargs: Any) -> list[Job]

Submits many jobs to the queue -- one for each sequence in the iterable. Waits for all to finish, then returns the results.

PARAMETER DESCRIPTION
fun

...

TYPE: Callable[..., Any]

iterable

...

TYPE: Sequence[Sequence[Any]]

*args

static arguments passed to the function.

TYPE: Any DEFAULT: ()

timeout

...

TYPE: float DEFAULT: 0

show_progress

...

TYPE: bool | None DEFAULT: None

**kwargs

static keyword-arguments passed to the function.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[Job]

...

Examples:

>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     jobs = queue.starmap(
...         add_nums, [(1, 2), (3, 4)]
...     )
...
>>> [job.result for job in jobs]
[3, 7]

starmapkw

starmapkw(fun: Callable[..., Any], iterable: Sequence[dict[str, Any]], /, *args: Any, timeout: float = 0, show_progress: bool | None = None, **kwargs: Any) -> list[Job]

Submits many jobs to the queue -- one for each dictionary in the iterable. Waits for all to finish, then returns the results.

PARAMETER DESCRIPTION
fun

...

TYPE: Callable[..., Any]

iterable

...

TYPE: Sequence[dict[str, Any]]

*args

static arguments passed to the function.

TYPE: Any DEFAULT: ()

timeout

...

TYPE: float DEFAULT: 0

show_progress

...

TYPE: bool | None DEFAULT: None

**kwargs

static keyword-arguments passed to the function.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[Job]

...

Examples:

>>> from ppqueue import Queue
>>> from time import sleep
...
>>> def add_nums(x: int, y: int) -> int:
...     return x + y
...
>>> with Queue() as queue:
...     jobs = queue.starmapkw(
...         add_nums, [{'x': 1, 'y': 2}, {'x': 3, 'y': 4}]
...     )
...
>>> [job.result for job in jobs]
[3, 7]

start

start() -> None

Start the queue pulse.

stop

stop() -> None

Stop the queue pulse.

wait

wait(*, n: int = 0, timeout: float = 0, poll_ms: int = 0, show_progress: bool | None = None) -> int

Wait for jobs to finish.

PARAMETER DESCRIPTION
n

the number of jobs to wait for (default=0, all).

TYPE: int DEFAULT: 0

timeout

seconds to wait before raising TimeoutError (default=0, indefinitely).

TYPE: float DEFAULT: 0

poll_ms

milliseconds to pause between checks (default=100).

TYPE: int DEFAULT: 0

show_progress

if True, present a progress bar.

TYPE: bool | None DEFAULT: None

RETURNS DESCRIPTION
int

If n <= 0, returns the count of unfinished jobs.

int

Else, returns the count of finished jobs.