46 lines
1.2 KiB
PHP
46 lines
1.2 KiB
PHP
|
<?php
|
||
|
|
||
|
namespace React\Stream;
|
||
|
|
||
|
// TODO: move to a trait
|
||
|
|
||
|
class Util
|
||
|
{
|
||
|
public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array())
|
||
|
{
|
||
|
// TODO: use stream_copy_to_stream
|
||
|
// it is 4x faster than this
|
||
|
// but can lose data under load with no way to recover it
|
||
|
|
||
|
$dest->emit('pipe', array($source));
|
||
|
|
||
|
$source->on('data', function ($data) use ($source, $dest) {
|
||
|
$feedMore = $dest->write($data);
|
||
|
|
||
|
if (false === $feedMore) {
|
||
|
$source->pause();
|
||
|
}
|
||
|
});
|
||
|
|
||
|
$dest->on('drain', function () use ($source) {
|
||
|
$source->resume();
|
||
|
});
|
||
|
|
||
|
$end = isset($options['end']) ? $options['end'] : true;
|
||
|
if ($end && $source !== $dest) {
|
||
|
$source->on('end', function () use ($dest) {
|
||
|
$dest->end();
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public static function forwardEvents($source, $target, array $events)
|
||
|
{
|
||
|
foreach ($events as $event) {
|
||
|
$source->on($event, function () use ($event, $target) {
|
||
|
$target->emit($event, func_get_args());
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
}
|