ppqueue.Queue¶
A parallel processing job runner / data structure.
PARAMETER | DESCRIPTION |
---|---|
max_concurrent
|
max number of concurrently running jobs.
TYPE:
|
max_size
|
max size of the queue (default=0, unlimited).
TYPE:
|
engine
|
the engine used to run jobs.
TYPE:
|
name
|
an identifier for this queue.
TYPE:
|
callback
|
a callable that is called immediately after each job is finished.
TYPE:
|
show_progress
|
global setting for showing progress bars.
TYPE:
|
drop_finished
|
if True, the queue will not store finished jobs for retrieval.
TYPE:
|
stop_when_idle
|
if True, the queue will stop the pulse when all jobs are finished.
TYPE:
|
pulse_freq_ms
|
the interval at which jobs are transitioned between internal queues.
TYPE:
|
no_start
|
if True, do not start the queue pulse on instantiation.
TYPE:
|
Examples:
>>> from ppqueue import Queue
>>> from time import sleep
...
>>> with Queue() as queue:
... jobs = queue.map(sleep, [1, 2, 3, 4, 5])
...
>>> len(jobs)
5
Functions¶
collect ¶
collect(n: int = 0, wait: bool = False, **kwargs: Any) -> list[Job]
Removes and returns all finished jobs from the queue.
PARAMETER | DESCRIPTION |
---|---|
n
|
collect this many jobs (default=0, all)
TYPE:
|
wait
|
If True, block until this many jobs are finished. Else, immediately return all finished.
TYPE:
|
**kwargs
|
kwargs given to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Job]
|
a list of |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... for i in range(5):
... _ = queue.enqueue(add_nums, args=[i, 100])
...
... jobs = queue.collect(wait=True)
...
>>> type(jobs)
<class 'list'>
>>> len(jobs)
5
dequeue ¶
dequeue(*, wait: bool = False, _peek: bool = False, **kwargs: Any) -> Job | None
Removes and returns the finished job with the highest priority from the queue.
PARAMETER | DESCRIPTION |
---|---|
wait
|
if no jobs are finished, wait for one.
TYPE:
|
**kwargs
|
passed to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Job | None
|
... |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... for i in range(5):
... _ = queue.enqueue(add_nums, args=[i, 100])
...
... jobs = [queue.dequeue(wait=True) for _ in range(queue.size())]
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]
enqueue ¶
enqueue(fun: Callable[..., Any], /, args: Sequence[Any] | None = None, kwargs: dict[str, Any] | None = None, name: str | None = None, priority: int = 100, group: int | None = None, timeout: float = 0, suppress_errors: bool = False, skip_on_group_error: bool = False) -> int
Adds a job to the queue.
PARAMETER | DESCRIPTION |
---|---|
fun
|
...
TYPE:
|
args
|
...
TYPE:
|
kwargs
|
...
TYPE:
|
name
|
...
TYPE:
|
priority
|
...
TYPE:
|
group
|
...
TYPE:
|
timeout
|
...
TYPE:
|
suppress_errors
|
...
TYPE:
|
skip_on_group_error
|
...
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
... |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... for i in range(5):
... _ = queue.enqueue(add_nums, args=[i, 100])
...
... jobs = queue.collect(wait=True)
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]
is_busy ¶
is_busy() -> bool
True if max concurrent limit is reached or if there are waiting jobs.
RETURNS | DESCRIPTION |
---|---|
bool
|
... |
is_empty ¶
is_empty() -> bool
True if there are no jobs in the queue system.
RETURNS | DESCRIPTION |
---|---|
bool
|
... |
is_full ¶
is_full() -> bool
True if the number of jobs in the queue system is equal to max_size.
RETURNS | DESCRIPTION |
---|---|
bool
|
... |
map ¶
map(fun: Callable[..., Any], iterable: Sequence[Any], /, *args: Any, timeout: float = 0, show_progress: bool | None = None, **kwargs: Any) -> list[Job]
Submits many jobs to the queue -- one for each item in the iterable. Waits for all to finish, then returns the results.
PARAMETER | DESCRIPTION |
---|---|
fun
|
...
TYPE:
|
iterable
|
...
TYPE:
|
*args
|
...
TYPE:
|
timeout
|
...
TYPE:
|
show_progress
|
...
TYPE:
|
**kwargs
|
...
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Job]
|
... |
Examples:
>>> from ppqueue import Queue
>>> from time import sleep
...
>>> with Queue() as queue:
... jobs = queue.map(sleep, [1, 2, 3, 4, 5])
...
>>> len(jobs)
5
peek ¶
peek(*args: Any, **kwargs: Any) -> Job | None
Returns the job with the highest priority from the queue.
Similar to dequeue
/ pop
, but the job remains in the queue.
PARAMETER | DESCRIPTION |
---|---|
*args
|
...
TYPE:
|
**kwargs
|
...
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Job | None
|
... |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... for i in range(5):
... _ = queue.put(add_nums, args=[i, 100])
...
... print('Before:', queue.size())
...
... job = queue.peek(wait=True)
...
... print('After:', queue.size())
...
Before: 5
After: 5
>>> job.result
100
pop ¶
pop(*args: Any, **kwargs: Any) -> Job | None
Alias for dequeue
.
PARAMETER | DESCRIPTION |
---|---|
*args
|
...
TYPE:
|
**kwargs
|
...
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Job | None
|
... |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... for i in range(5):
... _ = queue.put(add_nums, args=[i, 100])
...
... jobs = [queue.pop(wait=True) for _ in range(queue.size())]
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]
put ¶
put(*args, **kwargs) -> int
Alias for enqueue
.
Adds a job to the queue.
PARAMETER | DESCRIPTION |
---|---|
*args
|
...
DEFAULT:
|
**kwargs
|
...
DEFAULT:
|
RETURNS | DESCRIPTION |
---|---|
int
|
... |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... for i in range(5):
... _ = queue.put(add_nums, args=[i, 100])
... jobs = queue.collect(wait=True)
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]
size ¶
size(*, waiting: bool = False, working: bool = False, finished: bool = False) -> int
Get the number of jobs in the queue in state: waiting, working, and/or finished.
If no options are given, the total number of jobs in the queue is returned.
PARAMETER | DESCRIPTION |
---|---|
waiting
|
include waiting jobs.
TYPE:
|
working
|
include working jobs.
TYPE:
|
finished
|
include finished jobs.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
... |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... for i in range(5):
... _ = queue.enqueue(add_nums, args=[i, 100])
... print(queue.size())
1
2
3
4
5
starmap ¶
starmap(fun: Callable[..., Any], iterable: Sequence[Sequence[Any]], /, *args: Any, timeout: float = 0, show_progress: bool | None = None, **kwargs: Any) -> list[Job]
Submits many jobs to the queue -- one for each sequence in the iterable. Waits for all to finish, then returns the results.
PARAMETER | DESCRIPTION |
---|---|
fun
|
...
TYPE:
|
iterable
|
...
TYPE:
|
*args
|
static arguments passed to the function.
TYPE:
|
timeout
|
...
TYPE:
|
show_progress
|
...
TYPE:
|
**kwargs
|
static keyword-arguments passed to the function.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Job]
|
... |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... jobs = queue.starmap(
... add_nums, [(1, 2), (3, 4)]
... )
...
>>> [job.result for job in jobs]
[3, 7]
starmapkw ¶
starmapkw(fun: Callable[..., Any], iterable: Sequence[dict[str, Any]], /, *args: Any, timeout: float = 0, show_progress: bool | None = None, **kwargs: Any) -> list[Job]
Submits many jobs to the queue -- one for each dictionary in the iterable. Waits for all to finish, then returns the results.
PARAMETER | DESCRIPTION |
---|---|
fun
|
...
TYPE:
|
iterable
|
...
TYPE:
|
*args
|
static arguments passed to the function.
TYPE:
|
timeout
|
...
TYPE:
|
show_progress
|
...
TYPE:
|
**kwargs
|
static keyword-arguments passed to the function.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Job]
|
... |
Examples:
>>> from ppqueue import Queue
>>> from time import sleep
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... jobs = queue.starmapkw(
... add_nums, [{'x': 1, 'y': 2}, {'x': 3, 'y': 4}]
... )
...
>>> [job.result for job in jobs]
[3, 7]
wait ¶
wait(*, n: int = 0, timeout: float = 0, poll_ms: int = 0, show_progress: bool | None = None) -> int
Wait for jobs to finish.
PARAMETER | DESCRIPTION |
---|---|
n
|
the number of jobs to wait for (default=0, all).
TYPE:
|
timeout
|
seconds to wait before raising
TYPE:
|
poll_ms
|
milliseconds to pause between checks (default=100).
TYPE:
|
show_progress
|
if True, present a progress bar.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
If |
int
|
Else, returns the count of finished jobs. |