46 lines
		
	
	
		
			1.2 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
			
		
		
	
	
			46 lines
		
	
	
		
			1.2 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
| <?php
 | |
| 
 | |
| namespace React\Stream;
 | |
| 
 | |
| // TODO: move to a trait
 | |
| 
 | |
| class Util
 | |
| {
 | |
|     public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array())
 | |
|     {
 | |
|         // TODO: use stream_copy_to_stream
 | |
|         // it is 4x faster than this
 | |
|         // but can lose data under load with no way to recover it
 | |
| 
 | |
|         $dest->emit('pipe', array($source));
 | |
| 
 | |
|         $source->on('data', function ($data) use ($source, $dest) {
 | |
|             $feedMore = $dest->write($data);
 | |
| 
 | |
|             if (false === $feedMore) {
 | |
|                 $source->pause();
 | |
|             }
 | |
|         });
 | |
| 
 | |
|         $dest->on('drain', function () use ($source) {
 | |
|             $source->resume();
 | |
|         });
 | |
| 
 | |
|         $end = isset($options['end']) ? $options['end'] : true;
 | |
|         if ($end && $source !== $dest) {
 | |
|             $source->on('end', function () use ($dest) {
 | |
|                 $dest->end();
 | |
|             });
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     public static function forwardEvents($source, $target, array $events)
 | |
|     {
 | |
|         foreach ($events as $event) {
 | |
|             $source->on($event, function () use ($event, $target) {
 | |
|                 $target->emit($event, func_get_args());
 | |
|             });
 | |
|         }
 | |
|     }
 | |
| }
 |