<?php
function extract_docblocks($data) {
$inBlock = false;
$forceNewItem = true;
$currentBlock = array();
$blocks = array();
foreach (preg_split('/[\r\n]+/', $data) as $line) {
$line = trim($line);
if (!$inBlock) {
if ($line === '/**') {
$inBlock = true;
}
continue;
}
if ($line[0] !== '*') {
$inBlock = false;
$forceNewItem = true;
$currentBlock = array();
continue;
}
if ($line === '*/') {
$inBlock = false;
$forceNewItem = true;
$blocks[] = $currentBlock;
$currentBlock = array();
continue;
}
$line = trim(substr($line, 1));
if ($line === '') {
$forceNewItem = true;
} else if ($line[0] === '@' || $forceNewItem) {
$forceNewItem = false;
$currentBlock[] = $line;
} else {
$currentBlock[count($currentBlock) - 1] .= "\n" . $line;
}
}
return $blocks;
}
$str = <<<'CLASS'
<?php
/**
* Class EventLoop
*
* This is not the fastest event loop implementation in the world, at least
* partially because it's been stupidified for PHP4. It's not really for high
* performance asyncSaucezOMG though, so it shouldn't matter.
*/
class EventLoop
{
/**
* @var EventLoopConfig
*/
var $config;
/**
* @var EventLoopLogger
* @access private
*/
var $_logger;
/**
* @var WorkerPoolFactory
*/
var $_workerPoolFactory;
/**
* @var array
* @access private
*/
var $_streamWatchers = array(
'read' => array(
'streams' => array(),
'callbacks' => array(),
),
'write' => array(
'streams' => array(),
'callbacks' => array(),
),
'except' => array(
'streams' => array(),
'callbacks' => array(),
),
);
/**
* @var int
* @access private
*/
var $_streamWatcherCount = 0;
/**
* @var Scheduler
* @access private
*/
var $_scheduler;
/**
* @var WorkerPool
* @access private
*/
var $_dynamicWorkerPool;
/**
* @var WorkerPool
* @access private
*/
var $_dnsWorkerPool;
/**
* @var StreamHandler[]
* @access private
*/
var $_streamHandlers = array();
/**
* @var bool
* @access private
*/
var $_running = false;
/**
* Constructor
*
* @param WorkerPoolFactory $workerPoolFactory
* @param EventLoopConfig $config
* @param EventLoopLogger $logger
* @param SchedulerFactory $schedulerFactory
* @param StreamHandler $socketStreamHandler
*/
function EventLoop(&$workerPoolFactory, &$config, &$logger, &$schedulerFactory, &$socketStreamHandler)
{
$this->config = &$config;
$this->_logger = &$logger;
$this->_workerPoolFactory = &$workerPoolFactory;
$this->_scheduler = &$schedulerFactory->create($this->_logger);
$this->registerStreamHandler($socketStreamHandler, array('tcp', 'udp', 'unix'));
$this->_dynamicWorkerPool = &$workerPoolFactory->create($this, $this->config->workers, $this->_logger, 'Dynamic');
$config->dns->normalizeInheritableProperties($this->config->workers);
$this->_dnsWorkerPool = &$workerPoolFactory->create($this, $this->config->dns, $this->_logger, 'DNS');
}
/**
* Get the current time as a float
*
* @return float
* @access private
*/
function _now()
{
list($usec, $sec) = explode(' ', microtime());
return ((float)$usec + (float)$sec);
}
/**
* Check whether at least one client socket or scheduled item is active
*
* @return bool
* @access private
*/
function _isActive()
{
return $this->_streamWatcherCount || $this->_scheduler->isActive();
}
/**
* Get the time until there is something to do
*
* @return float|null
* @access private
*/
function _getNextActivityTime()
{
$candidates = array();
if (null !== $time = $this->_scheduler->getNextActivityTime()) {
$candidates[] = $time;
}
if (null !== $time = $this->_dynamicWorkerPool->getNextActivityTime()) {
$candidates[] = $time;
}
if (null !== $time = $this->_dnsWorkerPool->getNextActivityTime()) {
$candidates[] = $time;
}
return $candidates ? max(min($candidates), 0) : null;
}
/**
* Sleep until the next scheduled item is due to be executed
*
* @access private
*/
function _awaitNextActivity($timeout)
{
if ($timeout) {
$this->_logger->debug(null, 'Waiting until next scheduled activity, timeout: %f', $timeout);
usleep(floor($timeout * 1000000));
}
}
/**
* Process I/O on all active streams
*
* @access private
*/
function _processStreams($timeout)
{
$streams = array(
'read' => $this->_streamWatchers['read']['streams'],
'write' => $this->_streamWatchers['write']['streams'],
'except' => $this->_streamWatchers['except']['streams'],
);
if ($timeout !== null) {
$secs = floor($timeout);
$usecs = floor(($timeout - $secs) * 1000000);
} else {
$secs = $usecs = null;
}
$this->_logger->debug(
null, 'select() watching %d streams for activity (read: %d, write: %d, except: %d), timeout: %s',
count($streams['read']) + count($streams['write']) + count($streams['except']),
count($streams['read']), count($streams['write']), count($streams['except']),
$secs === null ? 'NULL' : "{$secs}s {$usecs}u"
);
$count = stream_select($streams['read'], $streams['write'], $streams['except'], $secs, $usecs);
if ($count === false) {
$this->_logger->error(null, 'select() operation failed!');
exit(1);
} else if ($count === 0) {
$this->_logger->debug(null, 'select() returned 0 streams with activity');
return;
}
$this->_logger->debug(
null, 'select() returned %d streams with activity (read: %d, write: %d, except: %d)',
$count, count($streams['read']), count($streams['write']), count($streams['except'])
);
foreach (array('read', 'write', 'except') as $op) {
foreach ($streams[$op] as $stream) {
$id = (int) $stream;
if (isset($this->_streamWatchers[$op]['callbacks'][$id])) {
call_user_func($this->_streamWatchers[$op]['callbacks'][$id], $stream);
}
}
}
}
/**
* Add a watcher for a stream
*
* @param string $op
* @param resource $stream
* @param callable $callback
* @return bool
* @access module
*/
function _addStreamWatcher($op, $stream, $callback)
{
$id = (int) $stream;
$op = strtolower($op);
if (!isset($this->_streamWatchers[$op])) {
$this->_logger->debug(null, 'Failed to add %s watcher on stream #%d: unknown op', $op, $id);
return false;
} else if (!is_resource($stream)) {
$this->_logger->debug(null, 'Failed to add %s watcher on stream #%d: not a valid stream resource', $op, $id);
return false;
} else if (!is_callable($callback)) {
$this->_logger->debug(null, 'Failed to add %s watcher on stream #%d: invalid callback', $op, $id);
return false;
}
if (!isset($this->_streamWatchers[$op]['streams'][$id])) {
$this->_streamWatchers[$op]['streams'][$id] = $stream;
$this->_streamWatcherCount++;
} else {
$this->_logger->debug(null, 'Overwrote %s watcher on stream #%d', $op, $id);
}
$this->_streamWatchers[$op]['callbacks'][$id] = $callback;
$this->_logger->debug(null, 'Added %s watcher on stream #%d, watchers: %d', $op, $id, $this->_streamWatcherCount);
return true;
}
/**
* Remove a watcher for a stream or stream ID
*
* @param string $op
* @param resource|int $stream
* @access module
*/
function _removeStreamWatcher($op, $stream)
{
$id = (int) $stream;
$op = strtolower($op);
if (isset($this->_streamWatchers[$op]['streams'][$id])) {
unset($this->_streamWatchers[$op]['streams'][$id], $this->_streamWatchers[$op]['callbacks'][$id]);
$this->_streamWatcherCount--;
$this->_logger->debug(null, 'Removed %s watcher on stream #%d, watchers: %d', $op, $id, $this->_streamWatcherCount);
} else {
$this->_logger->debug(null, 'Could not remove %s watcher on stream #%d: not registered', $op, $id);
}
}
/**
* Clean up and shut down the event loop
*/
function _shutdown()
{
$this->_logger->info(null, 'Shutting down event loop');
// Remove all stream watchers apart from workers
foreach (array('read', 'write', 'except') as $op) {
foreach ($this->_streamWatchers[$op]['callbacks'] as $id => $callback) {
if (!is_array($callback) || !is_a($callback[0], 'WorkerParentEndpoint')) {
$this->_removeStreamWatcher($op, $id);
}
}
}
// Empty the schedule
$this->_scheduler->shutdown();
// Send all workers a terminate signal
$this->_dynamicWorkerPool->shutdown();
$this->_dnsWorkerPool->shutdown();
// Let workers terminate gracefully
while ($this->_streamWatcherCount > 0) {
$this->_processStreams($this->_getNextActivityTime());
$this->_scheduler->processActivity();
}
}
/**
* Register a handler for a stream wrapper or set of wrappers
*
* @param StreamHandler $handler
* @param array $schemes
* @return bool
*/
function registerStreamHandler(&$handler, $schemes)
{
if (!is_a($handler, 'StreamHandler')) {
return false;
}
foreach ($schemes as $scheme) {
$this->_streamHandlers[strtolower($scheme)] = &$handler;
}
return true;
}
/**
* Create a new stream
*
* @param string $address
* @return SocketStream
*/
function &openStream($address)
{
if (!$url = parse_url($address)) {
$this->_logger->warn(null, 'Unable to parse stream URL %s: Parse failed', $address);
return null;
} else if (!isset($url['scheme'])) {
$this->_logger->warn(null, 'Unable to parse stream URL %s: Missing scheme', $address);
return null;
}
$scheme = strtolower($url['scheme']);
if (!isset($this->_streamHandlers[$scheme])) {
$this->_logger->warn(null, 'No stream handler registered for URI scheme: %s', $scheme);
return null;
}
$stream = &$this->_streamHandlers[$scheme]->createStream($this, $url);
return $stream;
}
/**
* Schedule a job to be run asynchronously in a worker
*
* @param string $method
* @param mixed ...$args
* @return AsyncJob
*/
function &async($method)
{
$args = func_get_args();
$job = &new AsyncJob($method, array_slice($args, 1));
$this->_dynamicWorkerPool->pushJob($job);
$this->_logger->info(null, 'Queued async job, method: %s', $method);
return $job;
}
/**
* Resolve a host name to an IP address asynchronously
*
* @param string $host
* @param callable $callback
* @param bool $returnHost
*/
function resolveHost($host, $callback, $returnHost = false)
{
if ($host === '255.255.255.255' || ip2long($host) !== -1) {
$this->in(0, $callback, $returnHost ? array($host, $host) : array($host));
} else {
$job = &new AsyncJob('DNSWorker::resolve', array($host));
$this->_dnsWorkerPool->pushJob($job);
$job->on('complete', $callback, $returnHost ? array($host) : array());
$job->on('error', $callback, $returnHost ? array($host, null) : array(null));
}
}
/**
* Schedule a callback to be execute after a specified number of microseconds
*
* @param int $usecs
* @param callable $callback
* @param array $args
* @return int
*/
function in($usecs, $callback, $args = array())
{
return $this->_scheduler->createItem($this->_now() + ($usecs / 1000000), $callback, $args);
}
/**
* Schedule a callback to execute at a specified timestamp
*
* @param float $timestamp
* @param callable $callback
* @param array $args
* @return int
*/
function at($timestamp, $callback, $args = array())
{
return $this->_scheduler->createItem($timestamp, $callback, $args);
}
/**
* Schedule a recurring callback to execute after a specified number of microseconds
*
* @param int $usecs
* @param callable $callback
* @param array $args
* @return int
*/
function every($usecs, $callback, $args = array())
{
$secs = $usecs / 1000000;
return $this->_scheduler->createItem($this->_now() + $secs, $callback, $args, $secs);
}
/**
* Cancel a scheduled callback
*
* @param int $id
* @return bool
*/
function cancel($id)
{
return $this->_scheduler->removeItem($id);
}
/**
* Execute the event loop
*/
function run()
{
$this->_logger->info(null, 'Starting event loop');
$this->_running = true;
// Main loop
while ($this->_running && $this->_isActive()) {
$timeout = $this->_getNextActivityTime();
if ($this->_streamWatcherCount > 0) {
$this->_processStreams($timeout);
} else {
$this->_awaitNextActivity($timeout);
}
$this->_dynamicWorkerPool->processActivity();
$this->_dnsWorkerPool->processActivity();
$this->_scheduler->processActivity();
}
$this->_running = false;
$this->_shutdown();
$this->_logger->info(null, 'Event loop terminated');
}
/**
* Check whether the event loop is running
*
* @return bool
*/
function isRunning()
{
return $this->_running;
}
/**
* Stop the event loop after the current iteration
*/
function stop()
{
$this->_logger->debug(null, 'Stopping event loop');
$this->_running = false;
}
}
CLASS;
print_r(extract_docblocks($str));
preferences:
40.21 ms | 402 KiB | 5 Q