ppqueue.Queue¶
A parallel processing job runner / data structure.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
max_concurrent |
int
|
max number of concurrently running jobs. |
multiprocessing.cpu_count()
|
max_size |
int
|
max size of the queue (default=0, unlimited). |
0
|
engine |
str | type[multiprocessing.Process] | type[threading.Thread]
|
the engine used to run jobs. |
Process
|
name |
str | None
|
an identifier for this queue. |
None
|
callback |
Callable[[Job], Any] | None
|
a callable that is called immediately after each job is finished. |
None
|
show_progress |
bool
|
global setting for showing progress bars. |
False
|
drop_finished |
bool
|
if True, the queue will not store finished jobs for retrieval. |
False
|
stop_when_idle |
bool
|
if True, the queue will stop the pulse when all jobs are finished. |
False
|
pulse_freq_ms |
int
|
the interval at which jobs are transitioned between internal queues. |
100
|
no_start |
bool
|
if True, do not start the queue pulse on instantiation. |
False
|
Examples:
>>> from ppqueue import Queue
>>> from time import sleep
...
>>> with Queue() as queue:
... jobs = queue.map(sleep, [1, 2, 3, 4, 5])
...
>>> len(jobs)
5
Functions¶
collect ¶
collect(
n: int = 0, wait: bool = False, **kwargs: Any
) -> list[Job]
Removes and returns all finished jobs from the queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n |
int
|
collect this many jobs (default=0, all) |
0
|
wait |
bool
|
If True, block until this many jobs are finished. Else, immediately return all finished. |
False
|
**kwargs |
Any
|
kwargs given to |
{}
|
Returns:
Type | Description |
---|---|
list[Job]
|
a list of |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... for i in range(5):
... _ = queue.enqueue(add_nums, args=[i, 100])
...
... jobs = queue.collect(wait=True)
...
>>> type(jobs)
<class 'list'>
>>> len(jobs)
5
dequeue ¶
dequeue(
*,
wait: bool = False,
_peek: bool = False,
**kwargs: Any
) -> Job | None
Removes and returns the finished job with the highest priority from the queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
wait |
bool
|
if no jobs are finished, wait for one. |
False
|
**kwargs |
Any
|
passed to |
{}
|
Returns:
Type | Description |
---|---|
Job | None
|
... |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... for i in range(5):
... _ = queue.enqueue(add_nums, args=[i, 100])
...
... jobs = [queue.dequeue(wait=True) for _ in range(queue.size())]
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]
enqueue ¶
enqueue(
fun: Callable[..., Any],
/,
args: Sequence[Any] | None = None,
kwargs: dict[str, Any] | None = None,
name: str | None = None,
priority: int = 100,
group: int | None = None,
timeout: float = 0,
suppress_errors: bool = False,
skip_on_group_error: bool = False,
) -> int
Adds a job to the queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fun |
Callable[..., Any]
|
... |
required |
args |
Sequence[Any] | None
|
... |
None
|
kwargs |
dict[str, Any] | None
|
... |
None
|
name |
str | None
|
... |
None
|
priority |
int
|
... |
100
|
group |
int | None
|
... |
None
|
timeout |
float
|
... |
0
|
suppress_errors |
bool
|
... |
False
|
skip_on_group_error |
bool
|
... |
False
|
Returns:
Type | Description |
---|---|
int
|
... |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... for i in range(5):
... _ = queue.enqueue(add_nums, args=[i, 100])
...
... jobs = queue.collect(wait=True)
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]
is_busy ¶
is_busy() -> bool
True if max concurrent limit is reached or if there are waiting jobs.
Returns:
Type | Description |
---|---|
bool
|
... |
is_empty ¶
is_empty() -> bool
True if there are no jobs in the queue system.
Returns:
Type | Description |
---|---|
bool
|
... |
is_full ¶
is_full() -> bool
True if the number of jobs in the queue system is equal to max_size.
Returns:
Type | Description |
---|---|
bool
|
... |
map ¶
map(
fun: Callable[..., Any],
iterable: Sequence[Any],
/,
*args: Any,
timeout: float = 0,
show_progress: bool | None = None,
**kwargs: Any,
) -> list[Job]
Submits many jobs to the queue -- one for each item in the iterable. Waits for all to finish, then returns the results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fun |
Callable[..., Any]
|
... |
required |
iterable |
Sequence[Any]
|
... |
required |
*args |
Any
|
... |
()
|
timeout |
float
|
... |
0
|
show_progress |
bool | None
|
... |
None
|
**kwargs |
Any
|
... |
{}
|
Returns:
Type | Description |
---|---|
list[Job]
|
... |
Examples:
>>> from ppqueue import Queue
>>> from time import sleep
...
>>> with Queue() as queue:
... jobs = queue.map(sleep, [1, 2, 3, 4, 5])
...
>>> len(jobs)
5
peek ¶
peek(*args: Any, **kwargs: Any) -> Job | None
Returns the job with the highest priority from the queue.
Similar to dequeue
/ pop
, but the job remains in the queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any
|
... |
()
|
**kwargs |
Any
|
... |
{}
|
Returns:
Type | Description |
---|---|
Job | None
|
... |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... for i in range(5):
... _ = queue.put(add_nums, args=[i, 100])
...
... print('Before:', queue.size())
...
... job = queue.peek(wait=True)
...
... print('After:', queue.size())
...
Before: 5
After: 5
>>> job.result
100
pop ¶
pop(*args: Any, **kwargs: Any) -> Job | None
Alias for dequeue
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any
|
... |
()
|
**kwargs |
Any
|
... |
{}
|
Returns:
Type | Description |
---|---|
Job | None
|
... |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... for i in range(5):
... _ = queue.put(add_nums, args=[i, 100])
...
... jobs = [queue.pop(wait=True) for _ in range(queue.size())]
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]
put ¶
put(*args, **kwargs) -> int
Alias for enqueue
.
Adds a job to the queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
... |
()
|
|
**kwargs |
... |
{}
|
Returns:
Type | Description |
---|---|
int
|
... |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... for i in range(5):
... _ = queue.put(add_nums, args=[i, 100])
... jobs = queue.collect(wait=True)
...
>>> [job.result for job in jobs]
[100, 101, 102, 103, 104]
size ¶
size(
*,
waiting: bool = False,
working: bool = False,
finished: bool = False
) -> int
Get the number of jobs in the queue in state: waiting, working, and/or finished.
If no options are given, the total number of jobs in the queue is returned.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
waiting |
bool
|
include waiting jobs. |
False
|
working |
bool
|
include working jobs. |
False
|
finished |
bool
|
include finished jobs. |
False
|
Returns:
Type | Description |
---|---|
int
|
... |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... for i in range(5):
... _ = queue.enqueue(add_nums, args=[i, 100])
... print(queue.size())
1
2
3
4
5
starmap ¶
starmap(
fun: Callable[..., Any],
iterable: Sequence[Sequence[Any]],
/,
*args: Any,
timeout: float = 0,
show_progress: bool | None = None,
**kwargs: Any,
) -> list[Job]
Submits many jobs to the queue -- one for each sequence in the iterable. Waits for all to finish, then returns the results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fun |
Callable[..., Any]
|
... |
required |
iterable |
Sequence[Sequence[Any]]
|
... |
required |
*args |
Any
|
static arguments passed to the function. |
()
|
timeout |
float
|
... |
0
|
show_progress |
bool | None
|
... |
None
|
**kwargs |
Any
|
static keyword-arguments passed to the function. |
{}
|
Returns:
Type | Description |
---|---|
list[Job]
|
... |
Examples:
>>> from ppqueue import Queue
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... jobs = queue.starmap(
... add_nums, [(1, 2), (3, 4)]
... )
...
>>> [job.result for job in jobs]
[3, 7]
starmapkw ¶
starmapkw(
fun: Callable[..., Any],
iterable: Sequence[dict[str, Any]],
/,
*args: Any,
timeout: float = 0,
show_progress: bool | None = None,
**kwargs: Any,
) -> list[Job]
Submits many jobs to the queue -- one for each dictionary in the iterable. Waits for all to finish, then returns the results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fun |
Callable[..., Any]
|
... |
required |
iterable |
Sequence[dict[str, Any]]
|
... |
required |
*args |
Any
|
static arguments passed to the function. |
()
|
timeout |
float
|
... |
0
|
show_progress |
bool | None
|
... |
None
|
**kwargs |
Any
|
static keyword-arguments passed to the function. |
{}
|
Returns:
Type | Description |
---|---|
list[Job]
|
... |
Examples:
>>> from ppqueue import Queue
>>> from time import sleep
...
>>> def add_nums(x: int, y: int) -> int:
... return x + y
...
>>> with Queue() as queue:
... jobs = queue.starmapkw(
... add_nums, [{'x': 1, 'y': 2}, {'x': 3, 'y': 4}]
... )
...
>>> [job.result for job in jobs]
[3, 7]
wait ¶
wait(
*,
n: int = 0,
timeout: float = 0,
poll_ms: int = 0,
show_progress: bool | None = None
) -> int
Wait for jobs to finish.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n |
int
|
the number of jobs to wait for (default=0, all). |
0
|
timeout |
float
|
seconds to wait before raising |
0
|
poll_ms |
int
|
milliseconds to pause between checks (default=100). |
0
|
show_progress |
bool | None
|
if True, present a progress bar. |
None
|
Returns:
Type | Description |
---|---|
int
|
If |
int
|
Else, returns the count of finished jobs. |