TrackManiaControl/libs/curl-easy/cURL/RequestsQueue.php

236 lines
5.8 KiB
PHP

<?php
namespace cURL;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Countable;
class RequestsQueue extends EventDispatcher implements RequestsQueueInterface, Countable
{
/**
* @var Options Default options for new Requests attached to RequestsQueue
*/
protected $defaultOptions = null;
/**
* @var resource cURL multi handler
*/
protected $mh;
/**
* @var int Amount of requests running
*/
protected $runningCount = 0;
/**
* @var array Array of requests attached
*/
protected $queue = array();
/**
* @var array Array of requests running
*/
protected $running = array();
/**
* Constructor
*
* @return void
*/
public function __construct()
{
$this->mh = curl_multi_init();
}
/**
* Destructor, closes curl_multi handler
*
* @return void
*/
public function __destruct()
{
if (isset($this->mh)) {
curl_multi_close($this->mh);
}
}
/**
* Returns cURL\Options object with default request's options
*
* @return Options
*/
public function getDefaultOptions()
{
if (!isset($this->defaultOptions)) {
$this->defaultOptions = new Options;
}
return $this->defaultOptions;
}
/**
* Overrides default options with given Options object
*
* @param Options $defaultOptions New options
*
* @return void
*/
public function setDefaultOptions(Options $defaultOptions)
{
$this->defaultOptions = $defaultOptions;
}
/**
* Get cURL multi handle
*
* @return resource
*/
public function getHandle()
{
return $this->mh;
}
/**
* Attach request to queue.
*
* @param Request $request Request to add
*
* @return self
*/
public function attach(Request $request)
{
$this->queue[$request->getUID()] = $request;
return $this;
}
/**
* Detach request from queue.
*
* @param Request $request Request to remove
*
* @return self
*/
public function detach(Request $request)
{
unset($this->queue[$request->getUID()]);
return $this;
}
/**
* Processes handles which are ready and removes them from pool.
*
* @return int Amount of requests completed
*/
protected function read()
{
$n = 0;
while ($info = curl_multi_info_read($this->mh)) {
$n++;
$request = $this->queue[(int)$info['handle']];
$result = $info['result'];
curl_multi_remove_handle($this->mh, $request->getHandle());
unset($this->running[$request->getUID()]);
$this->detach($request);
$event = new Event;
$event->request = $request;
$event->response = new Response($request, curl_multi_getcontent($request->getHandle()));
if ($result !== CURLE_OK) {
$event->response->setError(new Error(curl_error($request->getHandle()), $result));
}
$event->queue = $this;
$this->dispatch('complete', $event);
}
return $n;
}
/**
* Returns count of handles in queue
*
* @return int Handles count
*/
public function count()
{
return count($this->queue);
}
/**
* Executes requests in parallel
*
* @return void
*/
public function send()
{
while ($this->socketPerform()) {
$this->socketSelect();
}
}
/**
* Returns requests present in $queue but not in $running
*
* @return array Array of requests
*/
protected function getRequestsNotRunning()
{
return array_diff_key($this->queue, $this->running);
}
/**
* Download available data on socket.
*
* @return bool TRUE when there are any requests on queue, FALSE when finished
*/
public function socketPerform()
{
if ($this->count() == 0) {
throw new Exception('Cannot perform if there are no requests in queue.');
}
$notRunning = $this->getRequestsNotRunning();
do {
/**
* Apply cURL options to new requests
*/
foreach ($notRunning as $request) {
$this->getDefaultOptions()->applyTo($request);
$request->getOptions()->applyTo($request);
curl_multi_add_handle($this->mh, $request->getHandle());
$this->running[$request->getUID()] = $request;
}
$runningBefore = $this->runningCount;
do {
$mrc = curl_multi_exec($this->mh, $this->runningCount);
} while ($mrc === CURLM_CALL_MULTI_PERFORM);
$runningAfter = $this->runningCount;
$completed = ($runningAfter < $runningBefore) ? $this->read() : 0;
$notRunning = $this->getRequestsNotRunning();
} while (count($notRunning) > 0);
return $this->count() > 0;
}
/**
* Waits until activity on socket
* On success, returns TRUE. On failure, this function will
* return FALSE on a select failure or timeout (from the underlying
* select system call)
*
* @param float $timeout Maximum time to wait
*
* @return bool
*/
public function socketSelect($timeout = 1)
{
if ($this->count() == 0) {
throw new Exception('Cannot select if there are no requests in queue.');
}
return curl_multi_select($this->mh, $timeout) !== -1;
}
}