parallel\run

(1.0.0)

parallel\runExecution

说明

parallel\run ( Closure $task ) : Future|null

Shall schedule task for execution in parallel.

parallel\run ( Closure $task , array $argv ) : Future|null

Shall schedule task for execution in parallel, passing argv at execution time.

Automatic Scheduling

If a \parallel\Runtime internally created and cached by a previous call to parallel\run() is idle, it will be used to execute the task. If no \parallel\Runtime is idle parallel will create and cache a \parallel\Runtime.

Note:

\parallel\Runtime objects created by the programmer are not used for automatic scheduling.

参数

task

A Closure with specific characteristics.

argv

An array of arguments with specific characteristics to be passed to task at execution time.

Task Characteristics

Closures scheduled for parallel execution must not:

  • accept or return by reference
  • accept or return internal objects (see notes)
  • execute a limited set of instructions

Instructions prohibited in Closures intended for parallel execution are:

  • yield
  • use by-reference
  • declare class
  • declare named function

Note:

Nested closures may yield or use by-reference, but must not contain class or named function declarations.

Note:

No instructions are prohibited in the files which the task may include.

Arguments Characteristics

Arguments must not:

  • contain references
  • contain resources
  • contain internal objects (see notes)

Note:

In the case of file stream resources, the resource will be cast to the file descriptor and passed as int where possible, this is unsupported on Windows.

Internal Objects Notes

Internal objects generally use a custom structure which cannot be copied by value safely, PHP currently lacks the mechanics to do this (without serialization) and so only objects that do not use a custom structure may be shared.

Some internal objects do not use a custom structure, for example parallel\Events\Event and so may be shared.

Closures are a special kind of internal object and support being copied by value, and so may be shared.

Channels are central to writing parallel code and support concurrent access and execution by necessity, and so may be shared.

Warning

A user class that extends an internal class may use a custom structure as defined by the internal class, in which case they cannot be copied by value safely, and so may not be shared.

返回值

Warning

The return parallel\Future must not be ignored when the task contains a return or throw statement.

Exceptions

Warning

Shall throw parallel\Runtime\Error\Closed if parallel\Runtime was closed.

Warning

Shall throw parallel\Runtime\Error\IllegalFunction if task is a closure created from an internal function.

Warning

Shall throw parallel\Runtime\Error\IllegalInstruction if task contains illegal instructions.

Warning

Shall throw parallel\Runtime\Error\IllegalParameter if task accepts or argv contains illegal variables.

Warning

Shall throw parallel\Runtime\Error\IllegalReturn if task returns illegally.

User Contributed Notes

anonymous user 07-Feb-2021 02:49
Although function declaration is not allowed inside thread exec code, include is allowed. So if we want to declare a function, we could write another file that contain the function and include it.
# main.php
<?php
$runtime
= new parallel\Runtime ();
$future = $runtime->run ( function () {
   
$future = $runtime->run ( function () {
        include
"included.php";
        return
add (1, 3);
    }, [ ] );
echo
$future->value ();
# output: 4
# included.php
<?php
function add($a, $b){
    return
$a + $b;
}
john_2885 at yahoo dot com 20-Feb-2020 05:35
Here's a more substantial example of how to use the run functional API.

<?php
/*********************************************
 * Sample parallel functional API
 *
 * Scenario
 * -------------------------------------------
 * Given a large number of rows of
 * data to process, divide the work amongst
 * a set of workers.  Each worker is responsible
 * for finishing their assigned task.
 *
 * In the code below, assume we have arbitrary
 * start and end IDs (rows) - we will try to
 * divide the number of IDs (rows) evenly
 * across 8 workers.  The workers will get the
 * following batches to process to completion:
 *
 * Total number of IDs (rows): 1371129
 * Each worker will get 171392 IDs to process
 *
 * Worker 1: IDs from 11001 to 182393
 * Worker 2: IDs from 182393 to 353785
 * Worker 3: IDs from 353785 to 525177
 * Worker 4: IDs from 525177 to 696569
 * Worker 5: IDs from 696569 to 867961
 * Worker 6: IDs from 867961 to 1039353
 * Worker 7: IDs from 1039353 to 1210745
 * Worker 8: IDs from 1210745 to 1382130
 *
 * Each worker then processes 5000 rows at a time
 * until they are done with their assigned work
 *
 *********************************************/

use \parallel\{Runtime, Future, Channel, Events};

$minId = 11001;
$maxId = 1382130;
$workers = 8;
$totalIds = $maxId - $minId;
// Try to divide IDs evenly across the number of workers
$batchSize = ceil($totalIds / $workers);
// The last batch gets whatever is left over
$lastBatch = $totalIds % $batchSize;
// The number of IDs (rows) to divide the overall
// task into sub-batches
$rowsToFetch = 5000;

print
"Total IDs: " . $totalIds . "\n";
print
"Batch Size: " . $batchSize . "\n";
print
"Last Batch: " . $lastBatch . "\n";

$producer = function(int $worker, int $startId, int $endId, int $fetchSize) {
   
$tempMinId = $startId;
   
$tempMaxId = $tempMinId + $fetchSize;
   
$fetchCount = 1;
   
    print
"Worker " . $worker . " working on IDs from " . $startId . " to " . $endId . "\n";
   
    while(
$tempMinId < $endId) {
        for(
$i = $tempMinId; $i < $tempMaxId; $i++) {
           
$usleep = rand(500000, 1000000);
           
usleep($usleep);
            print
"Worker " . $worker " finished batch " . $fetchCount . " from ID " . $tempMinId . " to " . $tempMaxId . "\n";
           
// Need to explicitly break out of the for loop once complete or else it will forever process only the first sub-batch
           
break;
        }
       
       
// Now we move on to the next sub-batch for this worker
       
$tempMinId = $tempMaxId;
       
$tempMaxId = $tempMinId + $fetchSize;
        if(
$tempMaxId > $endId) {
           
$tempMaxId = $endId;
        }
       
// Introduce some timing randomness
       
$sleep = rand(1,5);
       
sleep($sleep);
       
$fetchCount++;
    }
   
   
// This worker has completed their entire batch
   
print "Worker " . $worker " finished\n";
   
};

// Create our workers and have them start working on their task
// In this case, it's a set of 171392 IDs to process
for($i = 0; $i < $workers; $i++) {
   
$startId = $minId + ($i * $batchSize);
   
$endId = $startId + $batchSize;
    if(
$i == ($workers - 1)) {
       
$endId = $maxId;
    }
    \
parallel\run($producer, array(($i+1), $startId, $endId, $rowsToFetch));
}

?>