<?php namespace cURL; use Symfony\Component\EventDispatcher\EventDispatcher; use Countable; class RequestsQueue extends EventDispatcher implements RequestsQueueInterface, Countable { /** * @var Options Default options for new Requests attached to RequestsQueue */ protected $defaultOptions = null; /** * @var resource cURL multi handler */ protected $mh; /** * @var int Amount of requests running */ protected $runningCount = 0; /** * @var array Array of requests attached */ protected $queue = array(); /** * @var array Array of requests running */ protected $running = array(); /** * Constructor * * @return void */ public function __construct() { $this->mh = curl_multi_init(); } /** * Destructor, closes curl_multi handler * * @return void */ public function __destruct() { if (isset($this->mh)) { curl_multi_close($this->mh); } } /** * Returns cURL\Options object with default request's options * * @return Options */ public function getDefaultOptions() { if (!isset($this->defaultOptions)) { $this->defaultOptions = new Options; } return $this->defaultOptions; } /** * Overrides default options with given Options object * * @param Options $defaultOptions New options * * @return void */ public function setDefaultOptions(Options $defaultOptions) { $this->defaultOptions = $defaultOptions; } /** * Get cURL multi handle * * @return resource */ public function getHandle() { return $this->mh; } /** * Attach request to queue. * * @param Request $request Request to add * * @return self */ public function attach(Request $request) { $this->queue[$request->getUID()] = $request; return $this; } /** * Detach request from queue. * * @param Request $request Request to remove * * @return self */ public function detach(Request $request) { unset($this->queue[$request->getUID()]); return $this; } /** * Processes handles which are ready and removes them from pool. * * @return int Amount of requests completed */ protected function read() { $n = 0; while ($info = curl_multi_info_read($this->mh)) { $n++; $request = $this->queue[(int)$info['handle']]; $result = $info['result']; curl_multi_remove_handle($this->mh, $request->getHandle()); unset($this->running[$request->getUID()]); $this->detach($request); $event = new Event; $event->request = $request; $event->response = new Response($request, curl_multi_getcontent($request->getHandle())); if ($result !== CURLE_OK) { $event->response->setError(new Error(curl_error($request->getHandle()), $result)); } $event->queue = $this; $this->dispatch('complete', $event); } return $n; } /** * Returns count of handles in queue * * @return int Handles count */ public function count() { return count($this->queue); } /** * Executes requests in parallel * * @return void */ public function send() { while ($this->socketPerform()) { $this->socketSelect(); } } /** * Returns requests present in $queue but not in $running * * @return array Array of requests */ protected function getRequestsNotRunning() { return array_diff_key($this->queue, $this->running); } /** * Download available data on socket. * * @return bool TRUE when there are any requests on queue, FALSE when finished */ public function socketPerform() { if ($this->count() == 0) { throw new Exception('Cannot perform if there are no requests in queue.'); } $notRunning = $this->getRequestsNotRunning(); do { /** * Apply cURL options to new requests */ foreach ($notRunning as $request) { $this->getDefaultOptions()->applyTo($request); $request->getOptions()->applyTo($request); curl_multi_add_handle($this->mh, $request->getHandle()); $this->running[$request->getUID()] = $request; } $runningBefore = $this->runningCount; do { $mrc = curl_multi_exec($this->mh, $this->runningCount); } while ($mrc === CURLM_CALL_MULTI_PERFORM); $runningAfter = $this->runningCount; $completed = ($runningAfter < $runningBefore) ? $this->read() : 0; $notRunning = $this->getRequestsNotRunning(); } while (count($notRunning) > 0); return $this->count() > 0; } /** * Waits until activity on socket * On success, returns TRUE. On failure, this function will * return FALSE on a select failure or timeout (from the underlying * select system call) * * @param float $timeout Maximum time to wait * * @return bool */ public function socketSelect($timeout = 1) { if ($this->count() == 0) { throw new Exception('Cannot select if there are no requests in queue.'); } return curl_multi_select($this->mh, $timeout) !== -1; } }