curl-easy / event dispatcher lib

This commit is contained in:
kremsy
2014-02-18 18:15:12 +01:00
committed by Steffen Schröder
parent 15f0164287
commit 244428a2fe
29 changed files with 2684 additions and 0 deletions

View File

@ -0,0 +1,81 @@
<?php
namespace cURL;
class Collection
{
/**
* @var array Collection
*/
protected $data = array();
/**
* Converts current object to array
*
* @return array
*/
public function toArray()
{
return $this->data;
}
/**
* Sets value
*
* @param mixed $key Key
* @param mixed $value Value
*
* @return self
*/
public function set($key, $value = null)
{
if (is_array($key)) {
foreach ($key as $k => $v) {
$this->data[$k] = $v;
}
} else {
$this->data[$key] = $value;
}
return $this;
}
/**
* Checks if key does exist
*
* @param mixed $key Key
*
* @return bool TRUE if exists, FALSE otherwise
*/
public function has($key)
{
return isset($this->data[$key]);
}
/**
* Returns value of $key, throws Exception if does not exist
*
* @param mixed $key Key
*
* @return mixed Value of key
*/
public function get($key)
{
if ($this->has($key)) {
return $this->data[$key];
} else {
throw new Exception('Key does not exist.');
}
}
/**
* Removes key
*
* @param mixed $key Key to remove
*
* @return self
*/
public function remove($key)
{
unset($this->data[$key]);
return $this;
}
}

View File

@ -0,0 +1,6 @@
<?php
namespace cURL;
class Error extends \Exception
{
}

View File

@ -0,0 +1,8 @@
<?php
namespace cURL;
use Symfony\Component\EventDispatcher\Event as SymfonyEvent;
class Event extends SymfonyEvent
{
}

View File

@ -0,0 +1,6 @@
<?php
namespace cURL;
class Exception extends \Exception
{
}

View File

@ -0,0 +1,68 @@
<?php
namespace cURL;
class Options extends Collection
{
/**
* @var array Array of cURL constants required for intelligent setters
*/
protected static $curlConstantsTable = array();
/**
* Applies options to Request object
*
* @param Request $request
*
* @return self
*/
public function applyTo(Request $request)
{
if (!empty($this->data)) {
curl_setopt_array($request->getHandle(), $this->data);
}
return $this;
}
/**
* Prepares array for intelligent setters
*
* @return void
*/
public static function loadCurlConstantsTable()
{
$constants = get_defined_constants(true);
$table = array();
foreach ($constants['curl'] as $key => $value) {
if (strpos($key, 'CURLOPT_') === 0) {
$key = str_ireplace(array('CURLOPT', '_'), '', $key);
$table[$key] = $value;
}
}
self::$curlConstantsTable = $table;
}
/**
* Intelligent setters
*
* @param string $name Function name
* @param array $args Arguments
*
* @return self
*/
public function __call($name, $args)
{
if (substr($name, 0, 3) == 'set' && isset($args[0])) {
if (empty(self::$curlConstantsTable)) {
self::loadCurlConstantsTable();
}
$const = strtoupper(substr($name, 3));
if (isset(self::$curlConstantsTable[$const])) {
$this->data[self::$curlConstantsTable[$const]] = $args[0];
return $this;
} else {
throw new Exception('Constant CURLOPT_'.$const.' does not exist.');
}
}
}
}

View File

@ -0,0 +1,150 @@
<?php
namespace cURL;
use Symfony\Component\EventDispatcher\EventDispatcher;
class Request extends EventDispatcher implements RequestInterface
{
/**
* @var resource cURL handler
*/
protected $ch;
/**
* @var RequestsQueue Queue instance when requesting async
*/
protected $queue;
/**
* @var Options Object containing options for current request
*/
protected $options = null;
/**
* Create new cURL handle
*
* @param string $url The URL to fetch.
*
* @return void
*/
public function __construct($url = null)
{
if ($url !== null) {
$this->getOptions()->set(CURLOPT_URL, $url);
}
$this->ch = curl_init();
}
/**
* Closes cURL resource and frees the memory.
* It is neccessary when you make a lot of requests
* and you want to avoid fill up the memory.
*
* @return void
*/
public function __destruct()
{
if (isset($this->ch)) {
curl_close($this->ch);
}
}
/**
* Get the cURL\Options instance
* Creates empty one if does not exist
*
* @return Options
*/
public function getOptions()
{
if (!isset($this->options)) {
$this->options = new Options;
}
return $this->options;
}
/**
* Sets Options
*
* @param Options $options Options
*
* @return void
*/
public function setOptions(Options $options)
{
$this->options = $options;
}
/**
* Returns cURL raw resource
*
* @return resource cURL handle
*/
public function getHandle()
{
return $this->ch;
}
/**
* Get unique id of cURL handle
* Useful for debugging or logging.
*
* @return int
*/
public function getUID()
{
return (int)$this->ch;
}
/**
* Perform a cURL session.
* Equivalent to curl_exec().
* This function should be called after initializing a cURL
* session and all the options for the session are set.
*
* @return Response
*/
public function send()
{
if ($this->options instanceof Options) {
$this->options->applyTo($this);
}
$content = curl_exec($this->ch);
$response = new Response($this, $content);
$errorCode = curl_errno($this->ch);
if ($errorCode !== CURLE_OK) {
$response->setError(new Error(curl_error($this->ch), $errorCode));
}
return $response;
}
protected function prepareQueue()
{
if (!isset($this->queue)) {
$request = $this;
$this->queue = new RequestsQueue;
$this->queue->addListener(
'complete',
function ($event) use ($request) {
$request->dispatch('complete', $event);
}
);
$this->queue->attach($this);
}
}
public function socketPerform()
{
$this->prepareQueue();
return $this->queue->socketPerform();
}
public function socketSelect($timeout = 1)
{
if (!isset($this->queue)) {
throw new Exception('Cannot select without perform before.');
}
return $this->queue->socketSelect($timeout);
}
}

View File

@ -0,0 +1,12 @@
<?php
namespace cURL;
interface RequestInterface
{
public function getOptions();
public function setOptions(Options $options);
public function getUID();
public function socketPerform();
public function socketSelect($timeout);
public function send();
}

View File

@ -0,0 +1,235 @@
<?php
namespace cURL;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Countable;
class RequestsQueue extends EventDispatcher implements RequestsQueueInterface, Countable
{
/**
* @var Options Default options for new Requests attached to RequestsQueue
*/
protected $defaultOptions = null;
/**
* @var resource cURL multi handler
*/
protected $mh;
/**
* @var int Amount of requests running
*/
protected $runningCount = 0;
/**
* @var array Array of requests attached
*/
protected $queue = array();
/**
* @var array Array of requests running
*/
protected $running = array();
/**
* Constructor
*
* @return void
*/
public function __construct()
{
$this->mh = curl_multi_init();
}
/**
* Destructor, closes curl_multi handler
*
* @return void
*/
public function __destruct()
{
if (isset($this->mh)) {
curl_multi_close($this->mh);
}
}
/**
* Returns cURL\Options object with default request's options
*
* @return Options
*/
public function getDefaultOptions()
{
if (!isset($this->defaultOptions)) {
$this->defaultOptions = new Options;
}
return $this->defaultOptions;
}
/**
* Overrides default options with given Options object
*
* @param Options $defaultOptions New options
*
* @return void
*/
public function setDefaultOptions(Options $defaultOptions)
{
$this->defaultOptions = $defaultOptions;
}
/**
* Get cURL multi handle
*
* @return resource
*/
public function getHandle()
{
return $this->mh;
}
/**
* Attach request to queue.
*
* @param Request $request Request to add
*
* @return self
*/
public function attach(Request $request)
{
$this->queue[$request->getUID()] = $request;
return $this;
}
/**
* Detach request from queue.
*
* @param Request $request Request to remove
*
* @return self
*/
public function detach(Request $request)
{
unset($this->queue[$request->getUID()]);
return $this;
}
/**
* Processes handles which are ready and removes them from pool.
*
* @return int Amount of requests completed
*/
protected function read()
{
$n = 0;
while ($info = curl_multi_info_read($this->mh)) {
$n++;
$request = $this->queue[(int)$info['handle']];
$result = $info['result'];
curl_multi_remove_handle($this->mh, $request->getHandle());
unset($this->running[$request->getUID()]);
$this->detach($request);
$event = new Event;
$event->request = $request;
$event->response = new Response($request, curl_multi_getcontent($request->getHandle()));
if ($result !== CURLE_OK) {
$event->response->setError(new Error(curl_error($request->getHandle()), $result));
}
$event->queue = $this;
$this->dispatch('complete', $event);
}
return $n;
}
/**
* Returns count of handles in queue
*
* @return int Handles count
*/
public function count()
{
return count($this->queue);
}
/**
* Executes requests in parallel
*
* @return void
*/
public function send()
{
while ($this->socketPerform()) {
$this->socketSelect();
}
}
/**
* Returns requests present in $queue but not in $running
*
* @return array Array of requests
*/
protected function getRequestsNotRunning()
{
return array_diff_key($this->queue, $this->running);
}
/**
* Download available data on socket.
*
* @return bool TRUE when there are any requests on queue, FALSE when finished
*/
public function socketPerform()
{
if ($this->count() == 0) {
throw new Exception('Cannot perform if there are no requests in queue.');
}
$notRunning = $this->getRequestsNotRunning();
do {
/**
* Apply cURL options to new requests
*/
foreach ($notRunning as $request) {
$this->getDefaultOptions()->applyTo($request);
$request->getOptions()->applyTo($request);
curl_multi_add_handle($this->mh, $request->getHandle());
$this->running[$request->getUID()] = $request;
}
$runningBefore = $this->runningCount;
do {
$mrc = curl_multi_exec($this->mh, $this->runningCount);
} while ($mrc === CURLM_CALL_MULTI_PERFORM);
$runningAfter = $this->runningCount;
$completed = ($runningAfter < $runningBefore) ? $this->read() : 0;
$notRunning = $this->getRequestsNotRunning();
} while (count($notRunning) > 0);
return $this->count() > 0;
}
/**
* Waits until activity on socket
* On success, returns TRUE. On failure, this function will
* return FALSE on a select failure or timeout (from the underlying
* select system call)
*
* @param float $timeout Maximum time to wait
*
* @return bool
*/
public function socketSelect($timeout = 1)
{
if ($this->count() == 0) {
throw new Exception('Cannot select if there are no requests in queue.');
}
return curl_multi_select($this->mh, $timeout) !== -1;
}
}

View File

@ -0,0 +1,13 @@
<?php
namespace cURL;
interface RequestsQueueInterface
{
public function getDefaultOptions();
public function setDefaultOptions(Options $defaultOptions);
public function attach(Request $request);
public function detach(Request $request);
public function send();
public function socketPerform();
public function socketSelect($timeout);
}

View File

@ -0,0 +1,83 @@
<?php
namespace cURL;
class Response
{
protected $ch;
protected $error;
protected $content = null;
/**
* Constructs response
*
* @param Request $request Request
* @param string $content Content of reponse
*
* @return void
*/
public function __construct(Request $request, $content = null)
{
$this->ch = $request->getHandle();
if (is_string($content)) {
$this->content = $content;
}
}
/**
* Get information regarding a current transfer
* If opt is given, returns its value as a string
* Otherwise, returns an associative array with
* the following elements (which correspond to opt), or FALSE on failure.
*
* @param int $key One of the CURLINFO_* constants
*
* @return mixed
*/
public function getInfo($key = null)
{
return $key === null ? curl_getinfo($this->ch) : curl_getinfo($this->ch, $key);
}
/**
* Returns content of request
*
* @return string Content
*/
public function getContent()
{
return $this->content;
}
/**
* Sets error instance
*
* @param Error $error Error to set
*
* @return void
*/
public function setError(Error $error)
{
$this->error = $error;
}
/**
* Returns a error instance
*
* @return Error|null
*/
public function getError()
{
return isset($this->error) ? $this->error : null;
}
/**
* Returns the error number for the last cURL operation.
*
* @return int Returns the error number or 0 (zero) if no error occurred.
*/
public function hasError()
{
return isset($this->error);
}
}