85 lines
		
	
	
		
			1.8 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
			
		
		
	
	
			85 lines
		
	
	
		
			1.8 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
| <?php
 | |
| 
 | |
| namespace React\Stream;
 | |
| 
 | |
| use Evenement\EventEmitter;
 | |
| 
 | |
| class CompositeStream extends EventEmitter implements DuplexStreamInterface
 | |
| {
 | |
|     protected $readable;
 | |
|     protected $writable;
 | |
|     protected $pipeSource;
 | |
| 
 | |
|     public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable)
 | |
|     {
 | |
|         $this->readable = $readable;
 | |
|         $this->writable = $writable;
 | |
| 
 | |
|         Util::forwardEvents($this->readable, $this, array('data', 'end', 'error', 'close'));
 | |
|         Util::forwardEvents($this->writable, $this, array('drain', 'error', 'close', 'pipe'));
 | |
| 
 | |
|         $this->readable->on('close', array($this, 'close'));
 | |
|         $this->writable->on('close', array($this, 'close'));
 | |
| 
 | |
|         $this->on('pipe', array($this, 'handlePipeEvent'));
 | |
|     }
 | |
| 
 | |
|     public function handlePipeEvent($source)
 | |
|     {
 | |
|         $this->pipeSource = $source;
 | |
|     }
 | |
| 
 | |
|     public function isReadable()
 | |
|     {
 | |
|         return $this->readable->isReadable();
 | |
|     }
 | |
| 
 | |
|     public function pause()
 | |
|     {
 | |
|         if ($this->pipeSource) {
 | |
|             $this->pipeSource->pause();
 | |
|         }
 | |
| 
 | |
|         $this->readable->pause();
 | |
|     }
 | |
| 
 | |
|     public function resume()
 | |
|     {
 | |
|         if ($this->pipeSource) {
 | |
|             $this->pipeSource->resume();
 | |
|         }
 | |
| 
 | |
|         $this->readable->resume();
 | |
|     }
 | |
| 
 | |
|     public function pipe(WritableStreamInterface $dest, array $options = array())
 | |
|     {
 | |
|         Util::pipe($this, $dest, $options);
 | |
| 
 | |
|         return $dest;
 | |
|     }
 | |
| 
 | |
|     public function isWritable()
 | |
|     {
 | |
|         return $this->writable->isWritable();
 | |
|     }
 | |
| 
 | |
|     public function write($data)
 | |
|     {
 | |
|         return $this->writable->write($data);
 | |
|     }
 | |
| 
 | |
|     public function end($data = null)
 | |
|     {
 | |
|         $this->writable->end($data);
 | |
|     }
 | |
| 
 | |
|     public function close()
 | |
|     {
 | |
|         $this->pipeSource = null;
 | |
| 
 | |
|         $this->readable->close();
 | |
|         $this->writable->close();
 | |
|     }
 | |
| }
 |