ppqueue.PPQueue¶
A parallel processing job runner / data structure.
PARAMETER | DESCRIPTION |
---|---|
max_concurrent
|
max number of concurrently running jobs.
TYPE:
|
max_size
|
max size of the queue (default=0, unlimited).
TYPE:
|
engine
|
the engine used to run jobs.
TYPE:
|
name
|
an identifier for this queue.
TYPE:
|
callback
|
a callable that is called immediately after each job is finished.
TYPE:
|
show_progress
|
global setting for showing progress bars.
TYPE:
|
drop_finished
|
if True, the queue will not store finished jobs for retrieval.
TYPE:
|
stop_when_idle
|
if True, the queue will stop the pulse when all jobs are finished.
TYPE:
|
pulse_freq_ms
|
the interval at which jobs are transitioned between internal queues.
TYPE:
|
no_start
|
if True, do not start the queue pulse on instantiation.
TYPE:
|
Examples:
from ppqueue import PPQueue
from time import sleep
with PPQueue() as queue:
jobs = queue.map(sleep, [1, 2, 3, 4, 5])
assert len(jobs) == 5
Functions¶
collect ¶
collect(
n: int = 0, wait: bool = False, **kwargs: Any
) -> list[Job]
Removes and returns all finished jobs from the queue.
PARAMETER | DESCRIPTION |
---|---|
n
|
collect this many jobs (default=0, all)
TYPE:
|
wait
|
If True, block until this many jobs are finished. Else, immediately return all finished.
TYPE:
|
**kwargs
|
kwargs given to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Job]
|
a list of |
Examples:
from ppqueue import PPQueue
def add_nums(x: int, y: int) -> int:
return x + y
with PPQueue() as queue:
for i in range(5):
_ = queue.enqueue(add_nums, args=[i, 100])
jobs = queue.collect(wait=True)
assert isinstance(jobs, list)
assert len(jobs) == 5
dequeue ¶
dequeue(
*,
wait: bool = False,
_peek: bool = False,
**kwargs: Any
) -> Job | None
Removes and returns the finished job with the highest priority from the queue.
PARAMETER | DESCRIPTION |
---|---|
wait
|
if no jobs are finished, wait for one.
TYPE:
|
**kwargs
|
passed to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Job | None
|
... |
Examples:
from ppqueue import PPQueue
def add_nums(x: int, y: int) -> int:
return x + y
with PPQueue() as queue:
for i in range(5):
_ = queue.enqueue(add_nums, args=[i, 100])
jobs = [queue.dequeue(wait=True) for _ in range(queue.size())]
results = [job.result for job in jobs]
assert [job.result for job in jobs] == [100, 101, 102, 103, 104]
enqueue ¶
enqueue(
fun: Callable[..., Any],
/,
args: Any = None,
kwargs: dict[str, Any] | None = None,
name: str | None = None,
priority: int = 100,
group: int | None = None,
timeout: float = 0,
suppress_errors: bool = False,
skip_on_group_error: bool = False,
) -> int
Adds a job to the queue.
PARAMETER | DESCRIPTION |
---|---|
fun
|
...
TYPE:
|
args
|
...
TYPE:
|
kwargs
|
...
TYPE:
|
name
|
...
TYPE:
|
priority
|
...
TYPE:
|
group
|
...
TYPE:
|
timeout
|
...
TYPE:
|
suppress_errors
|
...
TYPE:
|
skip_on_group_error
|
...
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
... |
Examples:
from ppqueue import PPQueue
def add_nums(x: int, y: int) -> int:
return x + y
with PPQueue() as queue:
for i in range(5):
_ = queue.enqueue(add_nums, args=[i, 100])
jobs = queue.collect(wait=True)
results = [job.result for job in jobs]
assert results == [100, 101, 102, 103, 104]
is_busy ¶
is_busy() -> bool
True if max concurrent limit is reached or if there are waiting jobs.
RETURNS | DESCRIPTION |
---|---|
bool
|
... |
is_empty ¶
is_empty() -> bool
True if there are no jobs in the queue system.
RETURNS | DESCRIPTION |
---|---|
bool
|
... |
is_full ¶
is_full() -> bool
True if the number of jobs in the queue system is equal to max_size.
RETURNS | DESCRIPTION |
---|---|
bool
|
... |
map ¶
map(
fun: Callable[..., Any],
iterable: Sequence[Any],
/,
*args: Any,
timeout: float = 0,
show_progress: bool | None = None,
**kwargs: Any,
) -> list[Job]
Submits many jobs to the queue -- one for each item in the iterable. Waits for all to finish, then returns the results.
PARAMETER | DESCRIPTION |
---|---|
fun
|
...
TYPE:
|
iterable
|
...
TYPE:
|
*args
|
...
TYPE:
|
timeout
|
...
TYPE:
|
show_progress
|
...
TYPE:
|
**kwargs
|
...
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Job]
|
... |
Examples:
from ppqueue import PPQueue
from time import sleep
with PPQueue() as queue:
jobs = queue.map(sleep, [1, 2, 3, 4, 5])
assert len(jobs) == 5
peek ¶
peek(*args: Any, **kwargs: Any) -> Job | None
Returns the job with the highest priority from the queue.
Similar to dequeue
/ pop
, but the job remains in the queue.
PARAMETER | DESCRIPTION |
---|---|
*args
|
...
TYPE:
|
**kwargs
|
...
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Job | None
|
... |
Examples:
from ppqueue import PPQueue
def add_nums(x: int, y: int) -> int:
return x + y
with PPQueue() as queue:
for i in range(5):
_ = queue.put(add_nums, args=[i, 100])
print("Before:", queue.size())
job = queue.peek(wait=True)
print("After:", queue.size())
# Before: 5
# After: 5
assert job.result == 100
pop ¶
pop(*args: Any, **kwargs: Any) -> Job | None
Alias for dequeue
.
PARAMETER | DESCRIPTION |
---|---|
*args
|
...
TYPE:
|
**kwargs
|
...
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Job | None
|
... |
Examples:
from ppqueue import PPQueue
def add_nums(x: int, y: int) -> int:
return x + y
with PPQueue() as queue:
for i in range(5):
_ = queue.put(add_nums, args=[i, 100])
jobs = [queue.pop(wait=True) for _ in range(queue.size())]
results = [job.result for job in jobs]
assert results == [100, 101, 102, 103, 104]
put ¶
put(*args, **kwargs) -> int
Alias for enqueue
.
Adds a job to the queue.
PARAMETER | DESCRIPTION |
---|---|
*args
|
...
DEFAULT:
|
**kwargs
|
...
DEFAULT:
|
RETURNS | DESCRIPTION |
---|---|
int
|
... |
Examples:
from ppqueue import PPQueue
def add_nums(x: int, y: int) -> int:
return x + y
with PPQueue() as queue:
for i in range(5):
_ = queue.put(add_nums, args=[i, 100])
jobs = queue.collect(wait=True)
results = [job.result for job in jobs]
assert results == [100, 101, 102, 103, 104]
size ¶
size(
*,
waiting: bool = False,
working: bool = False,
finished: bool = False
) -> int
Get the number of jobs in the queue in state: waiting, working, and/or finished.
If no options are given, the total number of jobs in the queue is returned.
PARAMETER | DESCRIPTION |
---|---|
waiting
|
include waiting jobs.
TYPE:
|
working
|
include working jobs.
TYPE:
|
finished
|
include finished jobs.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
... |
Examples:
from ppqueue import PPQueue
def add_nums(x: int, y: int) -> int:
return x + y
with PPQueue() as queue:
for i in range(5):
_ = queue.enqueue(add_nums, args=[i, 100])
print(queue.size())
# 1
# 2
# 3
# 4
# 5
starmap ¶
starmap(
fun: Callable[..., Any],
iterable: Sequence[Sequence[Any]],
/,
*args: Any,
timeout: float = 0,
show_progress: bool | None = None,
**kwargs: Any,
) -> list[Job]
Submits many jobs to the queue -- one for each sequence in the iterable. Waits for all to finish, then returns the results.
PARAMETER | DESCRIPTION |
---|---|
fun
|
...
TYPE:
|
iterable
|
...
TYPE:
|
*args
|
static arguments passed to the function.
TYPE:
|
timeout
|
...
TYPE:
|
show_progress
|
...
TYPE:
|
**kwargs
|
static keyword-arguments passed to the function.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Job]
|
... |
Examples:
from ppqueue import PPQueue
def add_nums(x: int, y: int) -> int:
return x + y
with PPQueue() as queue:
jobs = queue.starmap(add_nums, [(1, 2), (3, 4)])
results = [job.result for job in jobs]
assert results == [3, 7]
starmapkw ¶
starmapkw(
fun: Callable[..., Any],
iterable: Sequence[dict[str, Any]],
/,
*args: Any,
timeout: float = 0,
show_progress: bool | None = None,
**kwargs: Any,
) -> list[Job]
Submits many jobs to the queue -- one for each dictionary in the iterable. Waits for all to finish, then returns the results.
PARAMETER | DESCRIPTION |
---|---|
fun
|
...
TYPE:
|
iterable
|
...
TYPE:
|
*args
|
static arguments passed to the function.
TYPE:
|
timeout
|
...
TYPE:
|
show_progress
|
...
TYPE:
|
**kwargs
|
static keyword-arguments passed to the function.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
list[Job]
|
... |
Examples:
from ppqueue import PPQueue
from time import sleep
def add_nums(x: int, y: int) -> int:
return x + y
with PPQueue() as queue:
jobs = queue.starmapkw(add_nums, [{"x": 1, "y": 2}, {"x": 3, "y": 4}])
results = [job.result for job in jobs]
assert results == [3, 7]
wait ¶
wait(
*,
n: int = 0,
timeout: float = 0,
poll_ms: int = 0,
show_progress: bool | None = None
) -> int
Wait for jobs to finish.
PARAMETER | DESCRIPTION |
---|---|
n
|
the number of jobs to wait for (default=0, all).
TYPE:
|
timeout
|
seconds to wait before raising
TYPE:
|
poll_ms
|
milliseconds to pause between checks (default=100).
TYPE:
|
show_progress
|
if True, present a progress bar.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
If |
int
|
Else, returns the count of finished jobs. |