3v4l.org

run code in 200+ php & hhvm versions
Bugs & Features
<?php /** * Phalcon Framework * This source file is subject to the New BSD License that is bundled * with this package in the file docs/LICENSE.txt. * If you did not receive a copy of the license and are unable to * obtain it through the world-wide-web, please send an email * to license@phalconphp.com so we can send you a copy immediately. * * @author Nikita Vershinin <endeveit@gmail.com> */ namespace Phalcon\Queue\Beanstalk; use duncan3dc\Helpers\Fork; use Phalcon\Logger\Adapter as LoggerAdapter; use Phalcon\Queue\Beanstalk as Base; /** * \Phalcon\Queue\Beanstalk\Extended * Extended class to access the beanstalk queue service. * Supports tubes prefixes, pcntl-workers and tubes stats. */ class Extended extends Base { const CMD_PUT = "put "; const CMD_PEEKJOB = "peek "; const CMD_PEEK_READY = "peek-ready"; const CMD_PEEK_DELAYED = "peek-delayed"; const CMD_PEEK_BURIED = "peek-buried"; const CMD_RESERVE = "reserve"; const CMD_RESERVE_TIMEOUT = "reserve-with-timeout "; const CMD_DELETE = "delete "; const CMD_RELEASE = "release "; const CMD_BURY = "bury "; const CMD_KICK = "kick "; const CMD_JOBKICK = "kick-job "; const CMD_TOUCH = "touch "; const CMD_STATS = "stats"; const CMD_JOBSTATS = "stats-job "; const CMD_USE = "use "; const CMD_WATCH = "watch "; const CMD_IGNORE = "ignore "; const CMD_LIST_TUBES = "list-tubes"; const CMD_LIST_TUBE_USED = "list-tube-used"; const CMD_LIST_TUBES_WATCHED = "list-tubes-watched"; const CMD_STATS_TUBE = "stats-tube "; const CMD_QUIT = "quit"; const CMD_PAUSE_TUBE = "pause-tube"; const MSG_OK = "OK"; const MSG_OK_FMT = "OK %d"; const MSG_FOUND = "FOUND"; const MSG_NOTFOUND = "NOT_FOUND"; const MSG_RESERVED = "RESERVED"; const MSG_DEADLINE_SOON = "DEADLINE_SOON"; const MSG_TIMED_OUT = "TIMED_OUT"; const MSG_DELETED = "DELETED"; const MSG_RELEASED = "RELEASED"; const MSG_BURIED = "BURIED"; const MSG_KICKED = "KICKED"; const MSG_TOUCHED = "TOUCHED"; const MSG_BURIED_FMT = "BURIED %d"; const MSG_INSERTED_FMT = "INSERTED %d"; const MSG_NOT_IGNORED = "NOT_IGNORED"; /** * The server cannot allocate enough memory for the job. * The client should try again later. */ const MSG_OUT_OF_MEMORY = "OUT_OF_MEMORY"; /** * This indicates a bug in the server.It should never happen. * If it does happen, please report it at http://groups.google.com/group/beanstalk-talk. */ const MSG_INTERNAL_ERROR = "INTERNAL_ERROR"; const MSG_DRAINING = "DRAINING"; /** * The client sent a command line that was not well-formed. * This can happen if the line does not end with , * if non-numeric characters occur where an integer is expected, * if the wrong number of arguments are present, * or if the command line is mal-formed in any other way. */ const MSG_BAD_FORMAT = "BAD_FORMAT"; /** * The client sent a command that the server does not know. */ const MSG_UNKNOWN_COMMAND = "UNKNOWN_COMMAND"; const MSG_EXPECTED_CRLF = "EXPECTED_CRLF"; const MSG_JOB_TOO_BIG = "JOB_TOO_BIG"; /** * Seconds to wait before putting the job in the ready queue. * The job will be in the "delayed" state during this time. * * @const integer */ const DEFAULT_DELAY = 0; /** * Jobs with smaller priority values will be scheduled before jobs with larger priorities. * The most urgent priority is 0, the least urgent priority is 4294967295. * * @const integer */ const DEFAULT_PRIORITY = 1024; /** * Time to run - number of seconds to allow a worker to run this job. * The minimum ttr is 1. * * @const integer */ const DEFAULT_TTR = 60; /** * If provided the errors will be logged here. * * @var \Phalcon\Logger\Adapter */ protected $logger = null; /** * Tubes prefix. * * @var string */ protected $tubePrefix = null; /** * Queue handlers. * * @var array */ protected $workers = array(); /** * {@inheritdoc} * * @param array $options */ public function __construct($options = null) { parent::__construct($options); $logger = null; $tubePrefix = ''; if (is_array($options) || ($options instanceof \ArrayAccess)) { if (isset($options['prefix'])) { $tubePrefix = $options['prefix']; } if (isset($options['logger']) && ($options['logger'] instanceof LoggerAdapter)) { $logger = $options['logger']; } } $this->logger = $logger; $this->tubePrefix = $tubePrefix; } /** * Adds new worker to the pool. * * @param string $tube * @param callable $callback * @throws \InvalidArgumentException */ public function addWorker($tube, $callback) { if (!is_string($tube)) { throw new \InvalidArgumentException('The tube name must be a string.'); } if (!is_callable($callback)) { throw new \InvalidArgumentException('The callback is invalid.'); } $this->workers[$tube] = $callback; } /** * Runs the main worker cycle. * * @param boolean $ignoreErrors */ public function doWork($ignoreErrors = false) { declare (ticks = 1); set_time_limit(0); $fork = new Fork(); $fork->ignoreErrors = $ignoreErrors; foreach ($this->workers as $tube => $worker) { $that = clone $this; // Run the worker in separate process. $fork->call(function () use ($tube, $worker, $that, $fork, $ignoreErrors) { $that->connect(); do { $job = $that->reserveFromTube($tube); if ($job && ($job instanceof Job)) { $fork->call(function () use ($worker, $job) { call_user_func($worker, $job); }); try { $fork->wait(); try { $job->delete(); } catch (\Exception $e) { if (null !== $this->logger) { $this->logger->warning(sprintf( 'Exception thrown while deleting the job: %d — %s', $e->getCode(), $e->getMessage() )); } } } catch (\Exception $e) { if (null !== $this->logger) { $this->logger->warning(sprintf( 'Exception thrown while handling job #%s: %d — %s', $job->getId(), $e->getCode(), $e->getMessage() )); } if (!$ignoreErrors) { return; } } } else { // There is no jobs so let's sleep to not increase CPU usage usleep(rand(7000, 10000)); } } while (true); exit(0); }); } $fork->wait(); } /** * Puts a job on the queue using specified tube. * * @param string $tube * @param string $data * @param array $options * @return string|boolean job id or false */ public function putInTube($tube, $data, $options = null) { if (null === $options) { $options = array(); } if (!array_key_exists('delay', $options)) { $options['delay'] = self::DEFAULT_DELAY; } if (!array_key_exists('priority', $options)) { $options['priority'] = self::DEFAULT_PRIORITY; } if (!array_key_exists('ttr', $options)) { $options['ttr'] = self::DEFAULT_TTR; } $this->choose($this->getTubeName($tube)); return parent::put($data, $options); } /** * Reserves/locks a ready job from the specified tube. * * @param string $tube * @param integer $timeout * @return boolean|\Phalcon\Queue\Beanstalk\Job */ public function reserveFromTube($tube, $timeout = null) { $this->watch($this->getTubeName($tube)); return parent::reserve($timeout); } /** * Returns the names of all tubes on the server. * * @return array */ public function getTubes() { $result = array(); $lines = $this->getResponseLines('list-tubes'); if (null !== $lines) { foreach ($lines as $line) { $line = ltrim($line, '- '); if (empty($this->tubePrefix) || (0 === strpos($line, $this->tubePrefix))) { $result[] = !empty($this->tubePrefix) ? substr($line, strlen($this->tubePrefix)) : $line; } } } return $result; } /** * Returns information about the specified tube if it exists. * * @param string $tube * @return null|array */ public function getTubeStats($tube) { $result = null; $lines = $this->getResponseLines('stats-tube ' . $this->getTubeName($tube)); if (!empty($lines)) { foreach ($lines as $line) { if (false !== strpos($line, ':')) { list($name, $value) = explode(':', $line); if (null !== $value) { $result[$name] = intval($value); } } } } return $result; } /** * Returns information about specified job if it exists * * @param int $job_id * @return null|array */ public function getJobStats($job_id) { $result = null; $lines = $this->getResponseLines('stats-job ' . (int)$job_id); if (!empty($lines)) { foreach ($lines as $line) { if (false !== strpos($line, ':')) { list($name, $value) = explode(':', $line); if (null !== $value) { $result[$name] = intval($value); } } } } return $result; } /** * The kick-job command is a variant of kick that operates with a single * job identified by its job id. If the given job id exists and is in a * buried or delayed state, it will be moved to the ready queue of the the * same tube where it currently belongs. * * @param int $job_id is the job id to kick. * @return boolean */ public function jobKick($job_id) { $this->write(self::CMD_JOBKICK . $job_id); $response = $this->readStatus(); if ($response[0] != self::MSG_KICKED) { return false; } return true; } /** * The stats-job command gives statistical information about the specified * job if it exists. * * <i>return array:</i><br><br> * <b>id</b> is the job id<br> * <b>tube</b> is the name of the tube that contains this job<br> * <b>state</b> is ready or delayed or reserved or buried<br> * <b>pri</b> is the priority value set by the put, release, or bury commands.<br> * <b>age</b> is the time in seconds since the put command that created this job.<br> * <b>time-left</b> is the number of seconds left until the server puts this job into the ready queue. This number is only meaningful if the job is reserved or delayed. If the job is reserved and this amount of time elapses before its state changes, it is considered to have timed out.<br> * <b>file</b> is the number of the earliest binlog file containing this job. If -b wasn't used, this will be 0.<br> * <b>reserves</b> is the number of times this job has been reserved.<br> * <b>timeouts</b> is the number of times this job has timed out during a reservation.<br> * <b>releases</b> is the number of times a client has released this job from a reservation.<br> * <b>buries</b> is the number of times this job has been buried.<br> * <b>kicks</b> is the number of times this job has been kicked.<br> * * @param int $job_id is a job id. * @return boolean | array */ public function jobStats($job_id) { $this->write(self::CMD_JOBSTATS . $job_id); $response = $this->readYaml(); if ($response[0] != self::MSG_OK) { return false; } return $response[2]; } /** * Returns the number of tube watched by current session. * Example return array: array('WATCHED' => 1) * Added on 10-Jan-2014 20:04 IST by Tapan Kumar Thapa @ tapan.thapa@yahoo.com * * @param string $tube * @return null|array */ public function ignoreTube($tube) { $result = null; $lines = $this->getWatchingResponse('ignore ' . $this->getTubeName($tube)); if (!empty($lines)) { list($name, $value) = explode(' ', $lines); if (null !== $value) { $result[$name] = intval($value); } } return $result; } /** * Returns the tube name with prefix. * * @param string|null $tube * @return string */ protected function getTubeName($tube) { if ((null !== $this->tubePrefix) && (null !== $tube)) { $tube = str_replace($this->tubePrefix, '', $tube); if (0 !== strcmp($tube, 'default')) { return $this->tubePrefix . $tube; } } return $tube; } /** * Returns the result of command that wait the list in response from beanstalkd. * * @param string $cmd * @return array|null * @throws \RuntimeException */ protected function getResponseLines($cmd) { $result = null; $this->write(trim($cmd)); $response = $this->read(); $matches = array(); if (!preg_match('#^(OK (\d+))#mi', $response, $matches)) { throw new \RuntimeException(sprintf( 'Unhandled response: %s', $response )); } $result = preg_split("#[\r\n]+#", rtrim($this->read($matches[2]))); // discard header line if (isset($result[0]) && $result[0] == '---') { array_shift($result); } return $result; } /** * Returns the result of command that wait the list in response from beanstalkd. * Added on 10-Jan-2014 20:04 IST by Tapan Kumar Thapa @ tapan.thapa@yahoo.com * * @param string $cmd * @return string|null * @throws \RuntimeException */ protected function getWatchingResponse($cmd) { $result = null; $nbBytes = $this->write($cmd); if ($nbBytes && ($nbBytes > 0)) { $response = $this->read($nbBytes); $matches = array(); if (!preg_match('#^WATCHING (\d+).*?#', $response, $matches)) { throw new \RuntimeException(sprintf( 'Unhandled response: %s', $response )); } $result = $response; } return $result; } public function test() { $rW = $this->write("stats-job 10"); $response = $this->readYaml(); if ($response[0] != "RESERVED") { return false; } return $response[2]; } }
based on iN42E
Output for 7.0.0 - 7.2.0
Fatal error: Class 'Phalcon\Queue\Beanstalk' not found in /in/QnQiJ on line 23
Process exited with code 255.
Output for 5.4.0 - 5.6.28
Fatal error: Class 'Phalcon\Queue\Beanstalk' not found in /in/QnQiJ on line 24
Process exited with code 255.