Browse Source

dev 2020-11

Max F 5 years ago
parent
commit
70731eabda
1 changed files with 25 additions and 2 deletions
  1. 25 2
      src/Database/Redis/RedisQueue.php

+ 25 - 2
src/Database/Redis/RedisQueue.php

@@ -21,11 +21,13 @@ class RedisQueue
 		return $this->redis;
 	}
 
+
 	function getClient()
 	{
 		return $this->redis->getClient();
 	}
 
+
 	function push($data)
 	{
 		$data = serialize($data);
@@ -33,6 +35,7 @@ class RedisQueue
 		$client->rPush($this->queue_name, $data);
 	}
 
+
 	function unshift($data)
 	{
 		$data = serialize($data);
@@ -40,6 +43,7 @@ class RedisQueue
 		$client->lPush($this->queue_name, $data);
 	}
 
+
 	function pop($timeout=0)
 	{
 		$ts_start = microtime(true);
@@ -53,15 +57,25 @@ class RedisQueue
 				break;
 			}
 
+			if(connection_status() != CONNECTION_NORMAL || connection_aborted()) {
+				break;
+			}
+
 			$ts_end = microtime(true);
 			$duration = $ts_end - $ts_start;
-			if ($duration > $timeout) {
+			$remaining_max = $timeout - $duration;
+
+			if ($remaining_max <= 0) {
 				break;
 			}
+
+			usleep( 1000 * 1000 * min($remaining_max, 0.05) ); // 0.05 second
 		}
+		
 		return $data;
 	}
 
+
 	function shift($timeout=0)
 	{
 		$ts_start = microtime(true);
@@ -75,12 +89,21 @@ class RedisQueue
 				break;
 			}
 
+			if(connection_status() != CONNECTION_NORMAL || connection_aborted()) {
+				break;
+			}
+
 			$ts_end = microtime(true);
 			$duration = $ts_end - $ts_start;
-			if ($duration > $timeout) {
+			$remaining_max = $timeout - $duration;
+
+			if ($remaining_max <= 0) {
 				break;
 			}
+
+			usleep( 1000 * 1000 * min($remaining_max, 0.05) ); // 0.05 second
 		}
+
 		return $data;
 	}