updated requestsqueue

This commit is contained in:
kremsy 2017-05-20 11:29:15 +02:00
parent e706ca8b29
commit ec604f7a64
2 changed files with 220 additions and 235 deletions

View File

@ -1,235 +1,220 @@
<?php <?php
namespace cURL; namespace cURL;
use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\EventDispatcher\EventDispatcher;
class RequestsQueue extends EventDispatcher implements RequestsQueueInterface, \Countable class RequestsQueue extends EventDispatcher implements RequestsQueueInterface, \Countable {
{ /**
/** * @var Options Default options for new Requests attached to RequestsQueue
* @var Options Default options for new Requests attached to RequestsQueue */
*/ protected $defaultOptions = null;
protected $defaultOptions = null;
/** /**
* @var resource cURL multi handler * @var resource cURL multi handler
*/ */
protected $mh; protected $mh;
/** /**
* @var int Amount of requests running * @var Request[] Array of requests attached
*/ */
protected $runningCount = 0; protected $queue = array();
/** /**
* @var Request[] Array of requests attached * @var array Array of requests added to curl multi handle
*/ */
protected $queue = array(); protected $running = array();
/** /**
* @var array Array of requests running * Initializes curl_multi handler
*/ */
protected $running = array(); public function __construct() {
$this->mh = curl_multi_init();
}
/** /**
* Initializes curl_multi handler * Destructor, closes curl_multi handler
*/ *
public function __construct() * @return void
{ */
$this->mh = curl_multi_init(); public function __destruct() {
} if (isset($this->mh)) {
curl_multi_close($this->mh);
}
}
/** /**
* Destructor, closes curl_multi handler * Returns cURL\Options object with default request's options
* *
* @return void * @return Options
*/ */
public function __destruct() public function getDefaultOptions() {
{ if (!isset($this->defaultOptions)) {
if (isset($this->mh)) { $this->defaultOptions = new Options();
curl_multi_close($this->mh); }
} return $this->defaultOptions;
} }
/** /**
* Returns cURL\Options object with default request's options * Overrides default options with given Options object
* *
* @return Options * @param Options $defaultOptions New options
*/ * @return void
public function getDefaultOptions() */
{ public function setDefaultOptions(Options $defaultOptions) {
if (!isset($this->defaultOptions)) { $this->defaultOptions = $defaultOptions;
$this->defaultOptions = new Options(); }
}
return $this->defaultOptions;
}
/** /**
* Overrides default options with given Options object * Get cURL multi handle
* *
* @param Options $defaultOptions New options * @return resource
* @return void */
*/ public function getHandle() {
public function setDefaultOptions(Options $defaultOptions) return $this->mh;
{ }
$this->defaultOptions = $defaultOptions;
}
/** /**
* Get cURL multi handle * Attach request to queue.
* *
* @return resource * @param Request $request Request to add
*/ * @return self
public function getHandle() */
{ public function attach(Request $request) {
return $this->mh; $this->queue[$request->getUID()] = $request;
} return $this;
}
/** /**
* Attach request to queue. * Detach request from queue.
* *
* @param Request $request Request to add * @param Request $request Request to remove
* @return self * @return self
*/ */
public function attach(Request $request) public function detach(Request $request) {
{ unset($this->queue[$request->getUID()]);
$this->queue[$request->getUID()] = $request; return $this;
return $this; }
}
/** /**
* Detach request from queue. * Processes handles which are ready and removes them from pool.
* *
* @param Request $request Request to remove * @return int Amount of requests completed
* @return self */
*/ protected function read() {
public function detach(Request $request) $n = 0;
{ while ($info = curl_multi_info_read($this->mh)) {
unset($this->queue[$request->getUID()]); $n++;
return $this; $request = $this->queue[(int) $info['handle']];
} $result = $info['result'];
/** curl_multi_remove_handle($this->mh, $request->getHandle());
* Processes handles which are ready and removes them from pool. unset($this->running[$request->getUID()]);
* $this->detach($request);
* @return int Amount of requests completed
*/
protected function read()
{
$n = 0;
while ($info = curl_multi_info_read($this->mh)) {
$n++;
$request = $this->queue[(int)$info['handle']];
$result = $info['result'];
curl_multi_remove_handle($this->mh, $request->getHandle()); $event = new Event();
unset($this->running[$request->getUID()]); $event->request = $request;
$this->detach($request); $event->response = new Response($request, curl_multi_getcontent($request->getHandle()));
if ($result !== CURLE_OK) {
$event->response->setError(new Error(curl_error($request->getHandle()), $result));
}
$event->queue = $this;
$this->dispatch('complete', $event);
$request->dispatch('complete', $event);
}
$event = new Event(); return $n;
$event->request = $request; }
$event->response = new Response($request, curl_multi_getcontent($request->getHandle()));
if ($result !== CURLE_OK) {
$event->response->setError(new Error(curl_error($request->getHandle()), $result));
}
$event->queue = $this;
$this->dispatch('complete', $event);
$request->dispatch('complete', $event);
}
return $n; /**
} * Returns count of handles in queue
*
* @return int Handles count
*/
public function count() {
return count($this->queue);
}
/** /**
* Returns count of handles in queue * Executes requests in parallel
* *
* @return int Handles count * @return void
*/ */
public function count() public function send() {
{ while ($this->socketPerform()) {
return count($this->queue); $this->socketSelect();
} }
}
/** /**
* Executes requests in parallel * Returns requests present in $queue but not in $running
* *
* @return void * @return Request[] Array of requests
*/ */
public function send() protected function getRequestsNotRunning() {
{ $map = $this->queue;
while ($this->socketPerform()) { foreach ($this->running as $k => $v) {
$this->socketSelect(); unset($map[$k]);
} }
} return $map;
}
/** /**
* Returns requests present in $queue but not in $running * Download available data on socket.
* *
* @return Request[] Array of requests * @throws Exception
*/ * @return bool TRUE when there are any requests on queue, FALSE when finished
protected function getRequestsNotRunning() */
{ public function socketPerform() {
$map = $this->queue; if ($this->count() == 0) {
foreach($this->running as $k => $v) unset($map[$k]); throw new Exception('Cannot perform if there are no requests in queue.');
return $map; }
}
/** $notRunning = $this->getRequestsNotRunning();
* Download available data on socket. do {
* /**
* @throws Exception * Apply cURL options to new requests
* @return bool TRUE when there are any requests on queue, FALSE when finished */
*/ foreach ($notRunning as $request) {
public function socketPerform() $this->getDefaultOptions()->applyTo($request);
{ $request->getOptions()->applyTo($request);
if ($this->count() == 0) { curl_multi_add_handle($this->mh, $request->getHandle());
throw new Exception('Cannot perform if there are no requests in queue.'); $this->running[$request->getUID()] = $request;
} }
$notRunning = $this->getRequestsNotRunning(); $runningHandles = null;
do { do {
/** // http://curl.haxx.se/libcurl/c/curl_multi_perform.html
* Apply cURL options to new requests // If an added handle fails very quickly, it may never be counted as a running_handle.
*/ $mrc = curl_multi_exec($this->mh, $runningHandles);
foreach ($notRunning as $request) { } while ($mrc === CURLM_CALL_MULTI_PERFORM);
$this->getDefaultOptions()->applyTo($request);
$request->getOptions()->applyTo($request);
curl_multi_add_handle($this->mh, $request->getHandle());
$this->running[$request->getUID()] = $request;
}
$runningBefore = $this->runningCount; if ($runningHandles < count($this->running)) {
do { $this->read();
$mrc = curl_multi_exec($this->mh, $this->runningCount); }
} while ($mrc === CURLM_CALL_MULTI_PERFORM);
$runningAfter = $this->runningCount;
if ($runningAfter < $runningBefore) { $notRunning = $this->getRequestsNotRunning();
$this->read(); } while (count($notRunning) > 0);
} // Why the loop? New requests might be added at runtime on 'complete' event.
// So we need to attach them to curl_multi handle immediately.
$notRunning = $this->getRequestsNotRunning(); return $this->count() > 0;
} while (count($notRunning) > 0); }
// Why the loop? New requests might be added at runtime on 'complete' event.
// So we need to attach them to curl_multi handle immediately.
return $this->count() > 0; /**
} * Waits until activity on socket
* On success, returns TRUE. On failure, this function will
/** * return FALSE on a select failure or timeout (from the underlying
* Waits until activity on socket * select system call)
* On success, returns TRUE. On failure, this function will *
* return FALSE on a select failure or timeout (from the underlying * @param float|int $timeout Maximum time to wait
* select system call) * @throws Exception
* * @return bool
* @param float|int $timeout Maximum time to wait */
* @throws Exception public function socketSelect($timeout = 1) {
* @return bool if ($this->count() == 0) {
*/ throw new Exception('Cannot select if there are no requests in queue.');
public function socketSelect($timeout = 1) }
{ return curl_multi_select($this->mh, $timeout) !== -1;
if ($this->count() == 0) { }
throw new Exception('Cannot select if there are no requests in queue.');
}
return curl_multi_select($this->mh, $timeout) !== -1;
}
} }

View File

@ -49,7 +49,7 @@ class DedimaniaWebHandler {
$asyncHttpRequest = new AsyncHttpRequest($this->maniaControl, self::DEDIMANIA_URL); $asyncHttpRequest = new AsyncHttpRequest($this->maniaControl, self::DEDIMANIA_URL);
$asyncHttpRequest->setCallable(function ($data, $error) use ($updateRecords) { $asyncHttpRequest->setCallable(function ($data, $error) use ($updateRecords) {
Logger::log("Try to connect on Dedimania"); Logger::logInfo("Try to connect on Dedimania");
if (!$data || $error) { if (!$data || $error) {
Logger::logError("Dedimania Error while opening session: '{$error}' Line 42"); Logger::logError("Dedimania Error while opening session: '{$error}' Line 42");
@ -69,7 +69,7 @@ class DedimaniaWebHandler {
$responseData = $methodResponse[0]; $responseData = $methodResponse[0];
$this->dedimaniaData->sessionId = $responseData['SessionId']; $this->dedimaniaData->sessionId = $responseData['SessionId'];
if ($this->dedimaniaData->sessionId) { if ($this->dedimaniaData->sessionId) {
Logger::log("Dedimania connection successfully established."); Logger::logInfo("Dedimania connection successfully established.");
if ($updateRecords) { if ($updateRecords) {
$this->fetchDedimaniaRecords(); $this->fetchDedimaniaRecords();
@ -146,7 +146,7 @@ class DedimaniaWebHandler {
$this->dedimaniaData->records[$key] = new RecordData($record); $this->dedimaniaData->records[$key] = new RecordData($record);
} }
Logger::log(count($this->dedimaniaData->records) . " Dedimania Records Fetched succesfully!"); Logger::logInfo(count($this->dedimaniaData->records) . " Dedimania Records Fetched succesfully!");
$this->maniaLinkNeedsUpdate = true; $this->maniaLinkNeedsUpdate = true;
$this->maniaControl->getCallbackManager()->triggerCallback(DedimaniaPlugin::CB_DEDIMANIA_UPDATED, $this->dedimaniaData->records); //TODO $this->maniaControl->getCallbackManager()->triggerCallback(DedimaniaPlugin::CB_DEDIMANIA_UPDATED, $this->dedimaniaData->records); //TODO
@ -233,7 +233,7 @@ class DedimaniaWebHandler {
} }
if (!$record->vReplay) { if (!$record->vReplay) {
Logger::log("Ignore time for " . $record->login . " no validation replay found"); Logger::logInfo("Ignore time for " . $record->login . " no validation replay found");
continue; continue;
} }
@ -256,7 +256,7 @@ class DedimaniaWebHandler {
$data = array($this->dedimaniaData->sessionId, $this->getMapInfo(), $gameMode, $times, $replays); $data = array($this->dedimaniaData->sessionId, $this->getMapInfo(), $gameMode, $times, $replays);
$content = $this->encodeRequest(self::DEDIMANIA_SET_CHALLENGE_TIMES, $data); $content = $this->encodeRequest(self::DEDIMANIA_SET_CHALLENGE_TIMES, $data);
Logger::log("Dedimania Submitting Map Times at End-Map Start"); Logger::logInfo("Dedimania Submitting Map Times at End-Map Start");
$asyncHttpRequest = new AsyncHttpRequest($this->maniaControl, self::DEDIMANIA_URL); $asyncHttpRequest = new AsyncHttpRequest($this->maniaControl, self::DEDIMANIA_URL);
@ -280,7 +280,7 @@ class DedimaniaWebHandler {
if (!$methodResponse[0]) { if (!$methodResponse[0]) {
Logger::logError("Records Plugin: Submitting dedimania records failed."); Logger::logError("Records Plugin: Submitting dedimania records failed.");
} else { } else {
Logger::log("Dedimania Times succesfully submitted"); Logger::logInfo("Dedimania Times succesfully submitted");
} }
}); });
@ -332,7 +332,7 @@ class DedimaniaWebHandler {
$this->fetchDedimaniaRecords(true); $this->fetchDedimaniaRecords(true);
} }
Logger::log("Dedimania Player added " . $dediPlayer->login); Logger::logInfo("Dedimania Player added " . $dediPlayer->login);
$this->maniaLinkNeedsUpdate = true; //TODO handle update for only one player instead of everyone as soon splitted $this->maniaLinkNeedsUpdate = true; //TODO handle update for only one player instead of everyone as soon splitted
}); });
@ -376,7 +376,7 @@ class DedimaniaWebHandler {
$this->handleXmlRpcFault($methodResponse, self::DEDIMANIA_PLAYERDISCONNECT); $this->handleXmlRpcFault($methodResponse, self::DEDIMANIA_PLAYERDISCONNECT);
} }
Logger::log("Debug: Dedimania Player left"); Logger::logInfo("Debug: Dedimania Player removed");
}); });
$asyncHttpRequest->setContent($content); $asyncHttpRequest->setContent($content);
@ -420,7 +420,7 @@ class DedimaniaWebHandler {
$this->handleXmlRpcFault($methodResponse, self::DEDIMANIA_UPDATE_SERVER_PLAYERS); $this->handleXmlRpcFault($methodResponse, self::DEDIMANIA_UPDATE_SERVER_PLAYERS);
} }
Logger::log("Dedimania Playerlist Updated"); Logger::logInfo("Dedimania Playerlist Updated");
}); });
$asyncHttpRequest->setContent($content); $asyncHttpRequest->setContent($content);