Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
| Total | |
5.13% |
12 / 234 |
|
5.13% |
2 / 39 |
CRAP | |
0.00% |
0 / 1 |
| Sender | |
5.13% |
12 / 234 |
|
5.13% |
2 / 39 |
10067.99 | |
0.00% |
0 / 1 |
| get_instance | |
66.67% |
2 / 3 |
|
0.00% |
0 / 1 |
2.15 | |||
| __construct | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
| init | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
| maybe_set_user_from_token | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
| maybe_clear_user_from_token | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
| get_next_sync_time | |
71.43% |
5 / 7 |
|
0.00% |
0 / 1 |
2.09 | |||
| set_next_sync_time | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| do_full_sync | |
0.00% |
0 / 17 |
|
0.00% |
0 / 1 |
132 | |||
| continue_full_sync_enqueue | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
20 | |||
| do_sync | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
| do_dedicated_sync_and_exit | |
0.00% |
0 / 15 |
|
0.00% |
0 / 1 |
42 | |||
| do_sync_and_set_delays | |
0.00% |
0 / 28 |
|
0.00% |
0 / 1 |
210 | |||
| get_items_to_send | |
0.00% |
0 / 28 |
|
0.00% |
0 / 1 |
132 | |||
| fastcgi_finish_request | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
| do_sync_for_queue | |
7.50% |
3 / 40 |
|
0.00% |
0 / 1 |
218.61 | |||
| send_action | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
6 | |||
| create_action_to_send | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
2 | |||
| sync_object | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
2 | |||
| register_jetpack_xmlrpc_methods | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
| get_sync_queue | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
| get_full_sync_queue | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
| get_codec | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| set_codec | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
| send_checksum | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
| reset_sync_queue | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| reset_full_sync_queue | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| set_dequeue_max_bytes | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| set_upload_max_bytes | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| set_upload_max_rows | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| set_sync_wait_time | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| get_sync_wait_time | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| set_enqueue_wait_time | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| get_enqueue_wait_time | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| set_sync_wait_threshold | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| get_sync_wait_threshold | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| set_max_dequeue_time | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
| set_defaults | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
2 | |||
| reset_data | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
6 | |||
| uninstall | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
2 | |||
| 1 | <?php |
| 2 | /** |
| 3 | * Sync sender. |
| 4 | * |
| 5 | * @package automattic/jetpack-sync |
| 6 | */ |
| 7 | |
| 8 | namespace Automattic\Jetpack\Sync; |
| 9 | |
| 10 | use Automattic\Jetpack\Connection\Manager; |
| 11 | use Automattic\Jetpack\Constants; |
| 12 | use WP_Error; |
| 13 | |
| 14 | /** |
| 15 | * This class grabs pending actions from the queue and sends them |
| 16 | */ |
| 17 | class Sender { |
| 18 | /** |
| 19 | * Name of the option that stores the time of the next sync. |
| 20 | * |
| 21 | * @access public |
| 22 | * |
| 23 | * @var string |
| 24 | */ |
| 25 | const NEXT_SYNC_TIME_OPTION_NAME = 'jetpack_next_sync_time'; |
| 26 | |
| 27 | /** |
| 28 | * Name of the transient responsible for temprorarily disabling Sync sending during Pulls. |
| 29 | * |
| 30 | * @access public |
| 31 | * |
| 32 | * @var string |
| 33 | */ |
| 34 | const TEMP_SYNC_DISABLE_TRANSIENT_NAME = 'jetpack_disable_sync_sending'; |
| 35 | |
| 36 | /** |
| 37 | * Expiry of the transient responsible for temprorarily disabling Sync sending during Pulls. |
| 38 | * |
| 39 | * @access public |
| 40 | * |
| 41 | * @var int |
| 42 | */ |
| 43 | const TEMP_SYNC_DISABLE_TRANSIENT_EXPIRY = MINUTE_IN_SECONDS; |
| 44 | |
| 45 | /** |
| 46 | * Sync timeout after a WPCOM error. |
| 47 | * |
| 48 | * @access public |
| 49 | * |
| 50 | * @var int |
| 51 | */ |
| 52 | const WPCOM_ERROR_SYNC_DELAY = 60; |
| 53 | |
| 54 | /** |
| 55 | * Sync timeout after a queue has been locked. |
| 56 | * |
| 57 | * @access public |
| 58 | * |
| 59 | * @var int |
| 60 | */ |
| 61 | const QUEUE_LOCKED_SYNC_DELAY = 10; |
| 62 | |
| 63 | /** |
| 64 | * Maximum bytes to checkout without exceeding the memory limit. |
| 65 | * |
| 66 | * @access private |
| 67 | * |
| 68 | * @var int |
| 69 | */ |
| 70 | private $dequeue_max_bytes; |
| 71 | |
| 72 | /** |
| 73 | * Maximum bytes in a single encoded item. |
| 74 | * |
| 75 | * @access private |
| 76 | * |
| 77 | * @var int |
| 78 | */ |
| 79 | private $upload_max_bytes; |
| 80 | |
| 81 | /** |
| 82 | * Maximum number of sync items in a single action. |
| 83 | * |
| 84 | * @access private |
| 85 | * |
| 86 | * @var int |
| 87 | */ |
| 88 | private $upload_max_rows; |
| 89 | |
| 90 | /** |
| 91 | * Maximum time for perfirming a checkout of items from the queue (in seconds). |
| 92 | * |
| 93 | * @access private |
| 94 | * |
| 95 | * @var int |
| 96 | */ |
| 97 | private $max_dequeue_time; |
| 98 | |
| 99 | /** |
| 100 | * How many seconds to wait after sending sync items after exceeding the sync wait threshold (in seconds). |
| 101 | * |
| 102 | * @access private |
| 103 | * |
| 104 | * @var int |
| 105 | */ |
| 106 | private $sync_wait_time; |
| 107 | |
| 108 | /** |
| 109 | * How much maximum time to wait for the checkout to finish (in seconds). |
| 110 | * |
| 111 | * @access private |
| 112 | * |
| 113 | * @var int |
| 114 | */ |
| 115 | private $sync_wait_threshold; |
| 116 | |
| 117 | /** |
| 118 | * How much maximum time to wait for the sync items to be queued for sending (in seconds). |
| 119 | * |
| 120 | * @access private |
| 121 | * |
| 122 | * @var int |
| 123 | */ |
| 124 | private $enqueue_wait_time; |
| 125 | |
| 126 | /** |
| 127 | * Incremental sync queue object. |
| 128 | * |
| 129 | * @access private |
| 130 | * |
| 131 | * @var \Automattic\Jetpack\Sync\Queue |
| 132 | */ |
| 133 | private $sync_queue; |
| 134 | |
| 135 | /** |
| 136 | * Full sync queue object. |
| 137 | * |
| 138 | * @access private |
| 139 | * |
| 140 | * @var \Automattic\Jetpack\Sync\Queue |
| 141 | */ |
| 142 | private $full_sync_queue; |
| 143 | |
| 144 | /** |
| 145 | * Codec object for encoding and decoding sync items. |
| 146 | * |
| 147 | * @access private |
| 148 | * |
| 149 | * @var \Automattic\Jetpack\Sync\Codec_Interface |
| 150 | */ |
| 151 | private $codec; |
| 152 | |
| 153 | /** |
| 154 | * The current user before we change or clear it. |
| 155 | * |
| 156 | * @access private |
| 157 | * |
| 158 | * @var \WP_User |
| 159 | */ |
| 160 | private $old_user; |
| 161 | |
| 162 | /** |
| 163 | * Container for the singleton instance of this class. |
| 164 | * |
| 165 | * @access private |
| 166 | * @static |
| 167 | * |
| 168 | * @var \Automattic\Jetpack\Sync\Sender |
| 169 | */ |
| 170 | private static $instance; |
| 171 | |
| 172 | /** |
| 173 | * Retrieve the singleton instance of this class. |
| 174 | * |
| 175 | * @access public |
| 176 | * @static |
| 177 | * |
| 178 | * @return Sender |
| 179 | */ |
| 180 | public static function get_instance() { |
| 181 | if ( null === self::$instance ) { |
| 182 | self::$instance = new self(); |
| 183 | } |
| 184 | |
| 185 | return self::$instance; |
| 186 | } |
| 187 | |
| 188 | /** |
| 189 | * Constructor. |
| 190 | * This is necessary because you can't use "new" when you declare instance properties >:( |
| 191 | * |
| 192 | * @access protected |
| 193 | * @static |
| 194 | */ |
| 195 | protected function __construct() { |
| 196 | $this->set_defaults(); |
| 197 | $this->init(); |
| 198 | } |
| 199 | |
| 200 | /** |
| 201 | * Initialize the sender. |
| 202 | * Prepares the current user and initializes all sync modules. |
| 203 | * |
| 204 | * @access private |
| 205 | */ |
| 206 | private function init() { |
| 207 | add_action( 'jetpack_sync_before_send_queue_sync', array( $this, 'maybe_set_user_from_token' ), 1 ); |
| 208 | add_action( 'jetpack_sync_before_send_queue_sync', array( $this, 'maybe_clear_user_from_token' ), 20 ); |
| 209 | add_filter( 'jetpack_xmlrpc_unauthenticated_methods', array( $this, 'register_jetpack_xmlrpc_methods' ) ); |
| 210 | foreach ( Modules::get_modules() as $module ) { |
| 211 | $module->init_before_send(); |
| 212 | } |
| 213 | } |
| 214 | |
| 215 | /** |
| 216 | * Detect if this is a XMLRPC request with a valid signature. |
| 217 | * If so, changes the user to the new one. |
| 218 | * |
| 219 | * @access public |
| 220 | */ |
| 221 | public function maybe_set_user_from_token() { |
| 222 | $connection = new Manager(); |
| 223 | $verified_user = $connection->verify_xml_rpc_signature(); |
| 224 | if ( Constants::is_true( 'XMLRPC_REQUEST' ) && |
| 225 | ! is_wp_error( $verified_user ) |
| 226 | && $verified_user |
| 227 | ) { |
| 228 | $old_user = wp_get_current_user(); |
| 229 | $this->old_user = $old_user->ID ?? 0; |
| 230 | wp_set_current_user( $verified_user['user_id'] ); |
| 231 | } |
| 232 | } |
| 233 | |
| 234 | /** |
| 235 | * If we used to have a previous current user, revert back to it. |
| 236 | * |
| 237 | * @access public |
| 238 | */ |
| 239 | public function maybe_clear_user_from_token() { |
| 240 | if ( isset( $this->old_user ) ) { |
| 241 | wp_set_current_user( $this->old_user ); |
| 242 | } |
| 243 | } |
| 244 | |
| 245 | /** |
| 246 | * Retrieve the next sync time. |
| 247 | * |
| 248 | * Update @since 1.43.2 |
| 249 | * Sometimes when we process Sync requests in Jetpack, the server clock can be a |
| 250 | * bit in the future and this can lock Sync to not send stuff for a while. |
| 251 | * We are introducing an extra check, to make sure to limit the next_sync_time |
| 252 | * to be at most one hour in the future from the current time. |
| 253 | * |
| 254 | * @access public |
| 255 | * |
| 256 | * @param string $queue_name Name of the queue. |
| 257 | * @return float Timestamp of the next sync. |
| 258 | */ |
| 259 | public function get_next_sync_time( $queue_name ) { |
| 260 | $option_name = self::NEXT_SYNC_TIME_OPTION_NAME . '_' . $queue_name; |
| 261 | $next_sync_time = (float) get_option( $option_name, 0 ); |
| 262 | |
| 263 | $is_more_than_one_hour = ( $next_sync_time - microtime( true ) ) >= HOUR_IN_SECONDS; |
| 264 | |
| 265 | if ( $is_more_than_one_hour ) { |
| 266 | delete_option( $option_name ); |
| 267 | $next_sync_time = 0; |
| 268 | } |
| 269 | |
| 270 | return $next_sync_time; |
| 271 | } |
| 272 | |
| 273 | /** |
| 274 | * Set the next sync time. |
| 275 | * |
| 276 | * @access public |
| 277 | * |
| 278 | * @param int $time Timestamp of the next sync. |
| 279 | * @param string $queue_name Name of the queue. |
| 280 | * @return boolean True if update was successful, false otherwise. |
| 281 | */ |
| 282 | public function set_next_sync_time( $time, $queue_name ) { |
| 283 | return update_option( self::NEXT_SYNC_TIME_OPTION_NAME . '_' . $queue_name, $time, true ); |
| 284 | } |
| 285 | |
| 286 | /** |
| 287 | * Trigger a full sync. |
| 288 | * |
| 289 | * @access public |
| 290 | * |
| 291 | * @return boolean|WP_Error True if this sync sending was successful, error object otherwise. |
| 292 | */ |
| 293 | public function do_full_sync() { |
| 294 | $sync_module = Modules::get_module( 'full-sync' ); |
| 295 | '@phan-var Modules\Full_Sync_Immediately|Modules\Full_Sync $sync_module'; |
| 296 | if ( ! $sync_module ) { |
| 297 | return; |
| 298 | } |
| 299 | // Full Sync Disabled. |
| 300 | if ( ! Settings::get_setting( 'full_sync_sender_enabled' ) ) { |
| 301 | return; |
| 302 | } |
| 303 | |
| 304 | // Don't sync if request is marked as read only. |
| 305 | if ( Constants::is_true( 'JETPACK_SYNC_READ_ONLY' ) ) { |
| 306 | return new WP_Error( 'jetpack_sync_read_only' ); |
| 307 | } |
| 308 | |
| 309 | // Sync not started or Sync finished. |
| 310 | $status = $sync_module->get_status(); |
| 311 | if ( false === $status['started'] || ( ! empty( $status['started'] ) && ! empty( $status['finished'] ) ) ) { |
| 312 | return false; |
| 313 | } |
| 314 | |
| 315 | $this->continue_full_sync_enqueue(); |
| 316 | // immediate full sync sends data in continue_full_sync_enqueue. |
| 317 | if ( ! $sync_module instanceof Modules\Full_Sync_Immediately ) { |
| 318 | return $this->do_sync_and_set_delays( $this->full_sync_queue ); |
| 319 | } else { |
| 320 | $status = $sync_module->get_status(); |
| 321 | // Sync not started or Sync finished. |
| 322 | if ( false === $status['started'] || ( ! empty( $status['started'] ) && ! empty( $status['finished'] ) ) ) { |
| 323 | return false; |
| 324 | } else { |
| 325 | return true; |
| 326 | } |
| 327 | } |
| 328 | } |
| 329 | |
| 330 | /** |
| 331 | * Enqueue the next sync items for sending. |
| 332 | * Will not be done if the current request is a WP import one. |
| 333 | * Will be delayed until the next sync time comes. |
| 334 | * |
| 335 | * @access private |
| 336 | */ |
| 337 | private function continue_full_sync_enqueue() { |
| 338 | if ( defined( 'WP_IMPORTING' ) && WP_IMPORTING ) { |
| 339 | return false; |
| 340 | } |
| 341 | |
| 342 | if ( $this->get_next_sync_time( 'full-sync-enqueue' ) > microtime( true ) ) { |
| 343 | return false; |
| 344 | } |
| 345 | |
| 346 | $full_sync_module = Modules::get_module( 'full-sync' ); |
| 347 | '@phan-var Modules\Full_Sync_Immediately|Modules\Full_Sync $full_sync_module'; |
| 348 | $full_sync_module->continue_enqueuing(); |
| 349 | |
| 350 | $this->set_next_sync_time( time() + $this->get_enqueue_wait_time(), 'full-sync-enqueue' ); |
| 351 | } |
| 352 | |
| 353 | /** |
| 354 | * Trigger incremental sync. |
| 355 | * |
| 356 | * @access public |
| 357 | * |
| 358 | * @return boolean|WP_Error True if this sync sending was successful, error object otherwise. |
| 359 | */ |
| 360 | public function do_sync() { |
| 361 | // Sync directly during cron. We are doing this because otherwise |
| 362 | // the dedicated sync flow would be spawning HTTP requests during cron shutdown, |
| 363 | // which can be unreliable and cause sync lag for time-sensitive events like updates. |
| 364 | if ( ! Settings::is_dedicated_sync_enabled() || Settings::is_doing_cron() ) { |
| 365 | $result = $this->do_sync_and_set_delays( $this->sync_queue ); |
| 366 | } else { |
| 367 | $result = Dedicated_Sender::spawn_sync( $this->sync_queue ); |
| 368 | } |
| 369 | |
| 370 | return $result; |
| 371 | } |
| 372 | |
| 373 | /** |
| 374 | * Trigger incremental sync and early exit on Dedicated Sync request. |
| 375 | * |
| 376 | * @access public |
| 377 | * |
| 378 | * @param bool $do_real_exit If we should exit at the end of the request. We should by default. |
| 379 | * In the context of running this in the REST API, we actually want to return an error. |
| 380 | * |
| 381 | * @return void|WP_Error |
| 382 | */ |
| 383 | public function do_dedicated_sync_and_exit( $do_real_exit = true ) { |
| 384 | nocache_headers(); |
| 385 | |
| 386 | if ( ! Settings::is_dedicated_sync_enabled() ) { |
| 387 | return new WP_Error( 'dedicated_sync_disabled', 'Dedicated Sync flow is disabled.' ); |
| 388 | } |
| 389 | |
| 390 | if ( ! Dedicated_Sender::is_dedicated_sync_request() ) { |
| 391 | return new WP_Error( 'non_dedicated_sync_request', 'Not a Dedicated Sync request.' ); |
| 392 | } |
| 393 | |
| 394 | /** |
| 395 | * Output an `OK` to show that Dedicated Sync is enabled and we can process events. |
| 396 | * This is used to test the feature is working. |
| 397 | * |
| 398 | * @see \Automattic\Jetpack\Sync\Dedicated_Sender::can_spawn_dedicated_sync_request |
| 399 | */ |
| 400 | // phpcs:ignore WordPress.Security.EscapeOutput.OutputNotEscaped -- This is just a constant string used for Validation. |
| 401 | echo Dedicated_Sender::DEDICATED_SYNC_VALIDATION_STRING; |
| 402 | |
| 403 | // Try to disconnect the request as quickly as possible and process things in the background. |
| 404 | $this->fastcgi_finish_request(); |
| 405 | |
| 406 | /** |
| 407 | * Close the PHP session to free up the server threads to handle other requests while we |
| 408 | * send sync data with Dedicated Sync. |
| 409 | * |
| 410 | * When we spawn Dedicated Sync, we send `$_COOKIES` with the request to help out with any |
| 411 | * firewall and/or caching functionality that might prevent us to ping the site directly. |
| 412 | * |
| 413 | * This will cause Dedicated Sync to reuse the visitor's PHP session and lock it until the |
| 414 | * request finishes, which can take anywhere from 1 to 30+ seconds, depending on the server |
| 415 | * `max_execution_time` configuration. |
| 416 | * |
| 417 | * By closing the session we're freeing up the session, so other requests can acquire the |
| 418 | * lock and proceed with their own tasks. |
| 419 | */ |
| 420 | if ( session_status() === PHP_SESSION_ACTIVE ) { |
| 421 | session_write_close(); |
| 422 | } |
| 423 | |
| 424 | // Actually try to send Sync events. |
| 425 | $result = $this->do_sync_and_set_delays( $this->sync_queue ); |
| 426 | |
| 427 | // Output not used right now. Try to release dedicated sync lock |
| 428 | Dedicated_Sender::try_release_lock_spawn_request(); |
| 429 | |
| 430 | // If no errors occurred, re-spawn a dedicated Sync request. |
| 431 | if ( true === $result ) { |
| 432 | Dedicated_Sender::spawn_sync( $this->sync_queue ); |
| 433 | } |
| 434 | |
| 435 | if ( $do_real_exit ) { |
| 436 | exit( 0 ); |
| 437 | } |
| 438 | } |
| 439 | |
| 440 | /** |
| 441 | * Trigger sync for a certain sync queue. |
| 442 | * Responsible for setting next sync time. |
| 443 | * Will not be delayed if the current request is a WP import one. |
| 444 | * Will be delayed until the next sync time comes. |
| 445 | * |
| 446 | * @access public |
| 447 | * |
| 448 | * @param \Automattic\Jetpack\Sync\Queue $queue Queue object. |
| 449 | * |
| 450 | * @return boolean|WP_Error True if this sync sending was successful, error object otherwise. |
| 451 | */ |
| 452 | public function do_sync_and_set_delays( $queue ) { |
| 453 | // Don't sync if importing. |
| 454 | if ( defined( 'WP_IMPORTING' ) && WP_IMPORTING ) { |
| 455 | return new WP_Error( 'is_importing' ); |
| 456 | } |
| 457 | |
| 458 | // Don't sync if request is marked as read only. |
| 459 | if ( Constants::is_true( 'JETPACK_SYNC_READ_ONLY' ) ) { |
| 460 | return new WP_Error( 'jetpack_sync_read_only' ); |
| 461 | } |
| 462 | |
| 463 | if ( ! Settings::is_sender_enabled( $queue->id ) ) { |
| 464 | return new WP_Error( 'sender_disabled_for_queue_' . $queue->id ); |
| 465 | } |
| 466 | |
| 467 | if ( get_transient( self::TEMP_SYNC_DISABLE_TRANSIENT_NAME ) ) { |
| 468 | return new WP_Error( 'sender_temporarily_disabled_while_pulling' ); |
| 469 | } |
| 470 | |
| 471 | // Return early if we've gotten a retry-after header response. |
| 472 | $retry_time = get_option( Actions::RETRY_AFTER_PREFIX . $queue->id ); |
| 473 | if ( $retry_time ) { |
| 474 | // If expired update to false but don't send. Send will occurr in new request to avoid race conditions. |
| 475 | if ( microtime( true ) > $retry_time ) { |
| 476 | update_option( Actions::RETRY_AFTER_PREFIX . $queue->id, false, false ); |
| 477 | } |
| 478 | return new WP_Error( 'retry_after' ); |
| 479 | } |
| 480 | |
| 481 | // Don't sync if we are throttled. |
| 482 | if ( $this->get_next_sync_time( $queue->id ) > microtime( true ) ) { |
| 483 | return new WP_Error( 'sync_throttled' ); |
| 484 | } |
| 485 | |
| 486 | $start_time = microtime( true ); |
| 487 | |
| 488 | Settings::set_is_syncing( true ); |
| 489 | |
| 490 | $sync_result = $this->do_sync_for_queue( $queue ); |
| 491 | |
| 492 | Settings::set_is_syncing( false ); |
| 493 | |
| 494 | $exceeded_sync_wait_threshold = ( microtime( true ) - $start_time ) > (float) $this->get_sync_wait_threshold(); |
| 495 | |
| 496 | if ( is_wp_error( $sync_result ) ) { |
| 497 | if ( 'unclosed_buffer' === $sync_result->get_error_code() ) { |
| 498 | $this->set_next_sync_time( time() + self::QUEUE_LOCKED_SYNC_DELAY, $queue->id ); |
| 499 | } |
| 500 | if ( 'wpcom_error' === $sync_result->get_error_code() ) { |
| 501 | $this->set_next_sync_time( time() + self::WPCOM_ERROR_SYNC_DELAY, $queue->id ); |
| 502 | } |
| 503 | } elseif ( $exceeded_sync_wait_threshold && ! Settings::is_doing_cron() ) { |
| 504 | // If a send was slow, briefly pause before the next one. |
| 505 | // Applies only to Dedicated/Normal Sync to avoid impacting user traffic; |
| 506 | // cron jobs are exempt. |
| 507 | $this->set_next_sync_time( time() + $this->get_sync_wait_time(), $queue->id ); |
| 508 | } |
| 509 | |
| 510 | return $sync_result; |
| 511 | } |
| 512 | |
| 513 | /** |
| 514 | * Retrieve the next sync items to send. |
| 515 | * |
| 516 | * @access public |
| 517 | * |
| 518 | * @param (array|\Automattic\Jetpack\Sync\Queue_Buffer) $buffer_or_items Queue buffer or array of objects. |
| 519 | * @param boolean $encode Whether to encode the items. |
| 520 | * @return array Sync items to send. |
| 521 | */ |
| 522 | public function get_items_to_send( $buffer_or_items, $encode = true ) { |
| 523 | // Track how long we've been processing so we can avoid request timeouts. |
| 524 | $start_time = microtime( true ); |
| 525 | $upload_size = 0; |
| 526 | $items_to_send = array(); |
| 527 | $items = is_array( $buffer_or_items ) ? $buffer_or_items : $buffer_or_items->get_items(); |
| 528 | if ( ! is_array( $items ) ) { |
| 529 | $items = array(); |
| 530 | } |
| 531 | |
| 532 | // Set up current screen to avoid errors rendering content. |
| 533 | require_once ABSPATH . 'wp-admin/includes/class-wp-screen.php'; |
| 534 | require_once ABSPATH . 'wp-admin/includes/screen.php'; |
| 535 | set_current_screen( 'sync' ); |
| 536 | $skipped_items_ids = array(); |
| 537 | /** |
| 538 | * We estimate the total encoded size as we go by encoding each item individually. |
| 539 | * This is expensive, but the only way to really know :/ |
| 540 | */ |
| 541 | foreach ( $items as $key => $item ) { |
| 542 | if ( ! is_array( $item ) ) { |
| 543 | $skipped_items_ids[] = $key; |
| 544 | continue; |
| 545 | } |
| 546 | |
| 547 | // Suspending cache addition help prevent overloading in memory cache of large sites. |
| 548 | wp_suspend_cache_addition( true ); |
| 549 | /** |
| 550 | * Modify the data within an action before it is serialized and sent to the server |
| 551 | * For example, during full sync this expands Post ID's into full Post objects, |
| 552 | * so that we don't have to serialize the whole object into the queue. |
| 553 | * |
| 554 | * @since 1.6.3 |
| 555 | * @since-jetpack 4.2.0 |
| 556 | * |
| 557 | * @param array The action parameters |
| 558 | * @param int The ID of the user who triggered the action |
| 559 | */ |
| 560 | $item[1] = apply_filters( 'jetpack_sync_before_send_' . $item[0], $item[1], $item[2] ); |
| 561 | wp_suspend_cache_addition( false ); |
| 562 | // Serialization usage can lead to empty, null or false action_name. Lets skip as there is no information to send. |
| 563 | if ( empty( $item[0] ) || false === $item[1] ) { |
| 564 | $skipped_items_ids[] = $key; |
| 565 | continue; |
| 566 | } |
| 567 | $encoded_item = $this->codec->encode( $item ); |
| 568 | $upload_size += strlen( $encoded_item ); |
| 569 | if ( $upload_size > $this->upload_max_bytes && array() !== $items_to_send ) { |
| 570 | break; |
| 571 | } |
| 572 | $items_to_send[ $key ] = $encode ? $encoded_item : $item; |
| 573 | if ( microtime( true ) - $start_time > $this->max_dequeue_time ) { |
| 574 | break; |
| 575 | } |
| 576 | } |
| 577 | |
| 578 | return array( $items_to_send, $skipped_items_ids, $items, microtime( true ) - $start_time ); |
| 579 | } |
| 580 | |
| 581 | /** |
| 582 | * If supported, flush all response data to the client and finish the request. |
| 583 | * This allows for time consuming tasks to be performed without leaving the connection open. |
| 584 | * |
| 585 | * @access private |
| 586 | */ |
| 587 | private function fastcgi_finish_request() { |
| 588 | if ( function_exists( 'fastcgi_finish_request' ) ) { |
| 589 | fastcgi_finish_request(); |
| 590 | } |
| 591 | } |
| 592 | |
| 593 | /** |
| 594 | * Perform sync for a certain sync queue. |
| 595 | * |
| 596 | * @access public |
| 597 | * |
| 598 | * @param \Automattic\Jetpack\Sync\Queue $queue Queue object. |
| 599 | * |
| 600 | * @return boolean|WP_Error True if this sync sending was successful, error object otherwise. |
| 601 | */ |
| 602 | public function do_sync_for_queue( $queue ) { |
| 603 | do_action( 'jetpack_sync_before_send_queue_' . $queue->id ); |
| 604 | if ( $queue->size() === 0 ) { |
| 605 | return new WP_Error( 'empty_queue_' . $queue->id ); |
| 606 | } |
| 607 | |
| 608 | /** |
| 609 | * Now that we're sure we are about to sync, try to ignore user abort |
| 610 | * so we can avoid getting into a bad state. |
| 611 | */ |
| 612 | // https://plugins.trac.wordpress.org/ticket/2041 |
| 613 | if ( function_exists( 'ignore_user_abort' ) ) { |
| 614 | ignore_user_abort( true ); |
| 615 | } |
| 616 | |
| 617 | /* Don't make the request block till we finish, if possible. */ |
| 618 | if ( Constants::is_true( 'REST_REQUEST' ) || Constants::is_true( 'XMLRPC_REQUEST' ) ) { |
| 619 | $this->fastcgi_finish_request(); |
| 620 | } |
| 621 | |
| 622 | $checkout_start_time = microtime( true ); |
| 623 | |
| 624 | $buffer = $queue->checkout_with_memory_limit( $this->dequeue_max_bytes, $this->upload_max_rows ); |
| 625 | |
| 626 | if ( ! $buffer ) { |
| 627 | // Buffer has no items. |
| 628 | return new WP_Error( 'empty_buffer' ); |
| 629 | } |
| 630 | |
| 631 | if ( is_wp_error( $buffer ) ) { |
| 632 | return $buffer; |
| 633 | } |
| 634 | |
| 635 | $checkout_duration = microtime( true ) - $checkout_start_time; |
| 636 | |
| 637 | list( $items_to_send, $skipped_items_ids, $items, $preprocess_duration ) = $this->get_items_to_send( $buffer, true ); |
| 638 | if ( ! empty( $items_to_send ) ) { |
| 639 | /** |
| 640 | * Fires when data is ready to send to the server. |
| 641 | * Return false or WP_Error to abort the sync (e.g. if there's an error) |
| 642 | * The items will be automatically re-sent later |
| 643 | * |
| 644 | * @since 1.6.3 |
| 645 | * @since-jetpack 4.2.0 |
| 646 | * |
| 647 | * @param array $data The action buffer |
| 648 | * @param string $codec The codec name used to encode the data |
| 649 | * @param double $time The current time |
| 650 | * @param string $queue The queue used to send ('sync' or 'full_sync') |
| 651 | * @param float $checkout_duration The duration of the checkout operation. |
| 652 | * @param float $preprocess_duration The duration of the pre-process operation. |
| 653 | * @param int $queue_size The size of the sync queue at the time of processing. |
| 654 | */ |
| 655 | Settings::set_is_sending( true ); |
| 656 | $processed_item_ids = apply_filters( 'jetpack_sync_send_data', $items_to_send, $this->codec->name(), microtime( true ), $queue->id, $checkout_duration, $preprocess_duration, $queue->size(), $buffer->id ); |
| 657 | Settings::set_is_sending( false ); |
| 658 | } else { |
| 659 | $processed_item_ids = $skipped_items_ids; |
| 660 | $skipped_items_ids = array(); |
| 661 | } |
| 662 | |
| 663 | if ( 'non-blocking' !== $processed_item_ids ) { |
| 664 | if ( ! $processed_item_ids || is_wp_error( $processed_item_ids ) ) { |
| 665 | $checked_in_item_ids = $queue->checkin( $buffer ); |
| 666 | if ( is_wp_error( $checked_in_item_ids ) ) { |
| 667 | // phpcs:ignore WordPress.PHP.DevelopmentFunctions.error_log_error_log |
| 668 | error_log( 'Error checking in buffer: ' . $checked_in_item_ids->get_error_message() ); |
| 669 | $queue->force_checkin(); |
| 670 | } |
| 671 | if ( is_wp_error( $processed_item_ids ) ) { |
| 672 | return new WP_Error( 'wpcom_error', $processed_item_ids->get_error_code() ); |
| 673 | } |
| 674 | |
| 675 | // Returning a wpcom_error is a sign to the caller that we should wait a while before syncing again. |
| 676 | return new WP_Error( 'wpcom_error', 'jetpack_sync_send_data_false' ); |
| 677 | } else { |
| 678 | // Detect if the last item ID was an error. |
| 679 | $had_wp_error = is_wp_error( end( $processed_item_ids ) ); |
| 680 | $wp_error = $had_wp_error ? array_pop( $processed_item_ids ) : null; |
| 681 | // Also checkin any items that were skipped. |
| 682 | if ( array() !== $skipped_items_ids ) { |
| 683 | $processed_item_ids = array_merge( $processed_item_ids, $skipped_items_ids ); |
| 684 | } |
| 685 | $processed_items = array_intersect_key( $items, array_flip( $processed_item_ids ) ); |
| 686 | /** |
| 687 | * Allows us to keep track of all the actions that have been sent. |
| 688 | * Allows us to calculate the progress of specific actions. |
| 689 | * |
| 690 | * @since 1.6.3 |
| 691 | * @since-jetpack 4.2.0 |
| 692 | * |
| 693 | * @param array $processed_actions The actions that we send successfully. |
| 694 | */ |
| 695 | do_action( 'jetpack_sync_processed_actions', $processed_items ); |
| 696 | $queue->close( $buffer, $processed_item_ids ); |
| 697 | // Returning a WP_Error is a sign to the caller that we should wait a while before syncing again. |
| 698 | if ( $had_wp_error ) { |
| 699 | return new WP_Error( 'wpcom_error', $wp_error->get_error_code() ); |
| 700 | } |
| 701 | } |
| 702 | } |
| 703 | |
| 704 | return true; |
| 705 | } |
| 706 | |
| 707 | /** |
| 708 | * Immediately sends a single item without firing or enqueuing it |
| 709 | * |
| 710 | * @param string $action_name The action. |
| 711 | * @param array $data The data associated with the action. |
| 712 | * @param string $key The key to use for the action. |
| 713 | * |
| 714 | * @return array Items processed. TODO: this doesn't make much sense anymore, it should probably be just a bool. |
| 715 | */ |
| 716 | public function send_action( $action_name, $data = null, $key = null ) { |
| 717 | if ( ! Settings::is_sender_enabled( 'full_sync' ) ) { |
| 718 | return array(); |
| 719 | } |
| 720 | |
| 721 | // Compose the data to be sent. |
| 722 | $action_to_send = $this->create_action_to_send( $action_name, $data, $key ); |
| 723 | |
| 724 | list( $items_to_send, $skipped_items_ids, $items, $preprocess_duration ) = $this->get_items_to_send( $action_to_send, true ); // phpcs:ignore VariableAnalysis.CodeAnalysis.VariableAnalysis.UnusedVariable |
| 725 | Settings::set_is_sending( true ); |
| 726 | $processed_item_ids = apply_filters( 'jetpack_sync_send_data', $items_to_send, $this->get_codec()->name(), microtime( true ), 'immediate-send', 0, $preprocess_duration ); |
| 727 | Settings::set_is_sending( false ); |
| 728 | |
| 729 | /** |
| 730 | * Allows us to keep track of all the actions that have been sent. |
| 731 | * Allows us to calculate the progress of specific actions. |
| 732 | * |
| 733 | * @param array $processed_actions The actions that we send successfully. |
| 734 | * |
| 735 | * @since 1.6.3 |
| 736 | * @since-jetpack 4.2.0 |
| 737 | */ |
| 738 | do_action( 'jetpack_sync_processed_actions', $action_to_send ); |
| 739 | |
| 740 | return $processed_item_ids; |
| 741 | } |
| 742 | |
| 743 | /** |
| 744 | * Create an synthetic action for direct sending to WPCOM during full sync (for example) |
| 745 | * |
| 746 | * @access private |
| 747 | * |
| 748 | * @param string $action_name The action. |
| 749 | * @param array $data The data associated with the action. |
| 750 | * @param string $key The key to use for the action. |
| 751 | * @return array An array of synthetic sync actions keyed by current microtime(true) |
| 752 | */ |
| 753 | private function create_action_to_send( $action_name, $data, $key = null ) { |
| 754 | return array( |
| 755 | $key ?? (string) microtime( true ) => array( |
| 756 | $action_name, |
| 757 | $data, |
| 758 | get_current_user_id(), |
| 759 | microtime( true ), |
| 760 | Settings::is_importing(), |
| 761 | ), |
| 762 | ); |
| 763 | } |
| 764 | |
| 765 | /** |
| 766 | * Returns any object that is able to be synced. |
| 767 | * |
| 768 | * @access public |
| 769 | * |
| 770 | * @param array $args the synchronized object parameters. |
| 771 | * @return string Encoded sync object. |
| 772 | */ |
| 773 | public function sync_object( $args ) { |
| 774 | // For example: posts, post, 5. |
| 775 | list( $module_name, $object_type, $id ) = $args; |
| 776 | |
| 777 | $sync_module = Modules::get_module( $module_name ); |
| 778 | $codec = $this->get_codec(); |
| 779 | |
| 780 | return $codec->encode( $sync_module->get_object_by_id( $object_type, $id ) ); |
| 781 | } |
| 782 | |
| 783 | /** |
| 784 | * Register additional sync XML-RPC methods available to Jetpack for authenticated users. |
| 785 | * |
| 786 | * @access public |
| 787 | * @since 1.6.3 |
| 788 | * @since-jetpack 7.8.0 |
| 789 | * |
| 790 | * @param array $jetpack_methods XML-RPC methods available to the Jetpack Server. |
| 791 | * @return array Filtered XML-RPC methods. |
| 792 | */ |
| 793 | public function register_jetpack_xmlrpc_methods( $jetpack_methods ) { |
| 794 | $jetpack_methods['jetpack.syncObject'] = array( $this, 'sync_object' ); |
| 795 | return $jetpack_methods; |
| 796 | } |
| 797 | |
| 798 | /** |
| 799 | * Get the incremental sync queue object. |
| 800 | * |
| 801 | * @access public |
| 802 | * |
| 803 | * @return \Automattic\Jetpack\Sync\Queue Queue object. |
| 804 | */ |
| 805 | public function get_sync_queue() { |
| 806 | return $this->sync_queue; |
| 807 | } |
| 808 | |
| 809 | /** |
| 810 | * Get the full sync queue object. |
| 811 | * |
| 812 | * @access public |
| 813 | * |
| 814 | * @return \Automattic\Jetpack\Sync\Queue Queue object. |
| 815 | */ |
| 816 | public function get_full_sync_queue() { |
| 817 | return $this->full_sync_queue; |
| 818 | } |
| 819 | |
| 820 | /** |
| 821 | * Get the codec object. |
| 822 | * |
| 823 | * @access public |
| 824 | * |
| 825 | * @return \Automattic\Jetpack\Sync\Codec_Interface Codec object. |
| 826 | */ |
| 827 | public function get_codec() { |
| 828 | return $this->codec; |
| 829 | } |
| 830 | |
| 831 | /** |
| 832 | * Determine the codec object. |
| 833 | * Use gzip deflate if supported. |
| 834 | * |
| 835 | * @access public |
| 836 | */ |
| 837 | public function set_codec() { |
| 838 | if ( function_exists( 'gzinflate' ) ) { |
| 839 | $this->codec = new JSON_Deflate_Array_Codec(); |
| 840 | } else { |
| 841 | $this->codec = new Simple_Codec(); |
| 842 | } |
| 843 | } |
| 844 | |
| 845 | /** |
| 846 | * Compute and send all the checksums. |
| 847 | * |
| 848 | * @access public |
| 849 | */ |
| 850 | public function send_checksum() { |
| 851 | $store = new Replicastore(); |
| 852 | do_action( 'jetpack_sync_checksum', $store->checksum_all() ); |
| 853 | } |
| 854 | |
| 855 | /** |
| 856 | * Reset the incremental sync queue. |
| 857 | * |
| 858 | * @access public |
| 859 | */ |
| 860 | public function reset_sync_queue() { |
| 861 | $this->sync_queue->reset(); |
| 862 | } |
| 863 | |
| 864 | /** |
| 865 | * Reset the full sync queue. |
| 866 | * |
| 867 | * @access public |
| 868 | */ |
| 869 | public function reset_full_sync_queue() { |
| 870 | $this->full_sync_queue->reset(); |
| 871 | } |
| 872 | |
| 873 | /** |
| 874 | * Set the maximum bytes to checkout without exceeding the memory limit. |
| 875 | * |
| 876 | * @access public |
| 877 | * |
| 878 | * @param int $size Maximum bytes to checkout. |
| 879 | */ |
| 880 | public function set_dequeue_max_bytes( $size ) { |
| 881 | $this->dequeue_max_bytes = $size; |
| 882 | } |
| 883 | |
| 884 | /** |
| 885 | * Set the maximum bytes in a single encoded item. |
| 886 | * |
| 887 | * @access public |
| 888 | * |
| 889 | * @param int $max_bytes Maximum bytes in a single encoded item. |
| 890 | */ |
| 891 | public function set_upload_max_bytes( $max_bytes ) { |
| 892 | $this->upload_max_bytes = $max_bytes; |
| 893 | } |
| 894 | |
| 895 | /** |
| 896 | * Set the maximum number of sync items in a single action. |
| 897 | * |
| 898 | * @access public |
| 899 | * |
| 900 | * @param int $max_rows Maximum number of sync items. |
| 901 | */ |
| 902 | public function set_upload_max_rows( $max_rows ) { |
| 903 | $this->upload_max_rows = $max_rows; |
| 904 | } |
| 905 | |
| 906 | /** |
| 907 | * Set the sync wait time (in seconds). |
| 908 | * |
| 909 | * @access public |
| 910 | * |
| 911 | * @param int $seconds Sync wait time. |
| 912 | */ |
| 913 | public function set_sync_wait_time( $seconds ) { |
| 914 | $this->sync_wait_time = $seconds; |
| 915 | } |
| 916 | |
| 917 | /** |
| 918 | * Get current sync wait time (in seconds). |
| 919 | * |
| 920 | * @access public |
| 921 | * |
| 922 | * @return int Sync wait time. |
| 923 | */ |
| 924 | public function get_sync_wait_time() { |
| 925 | return $this->sync_wait_time; |
| 926 | } |
| 927 | |
| 928 | /** |
| 929 | * Set the enqueue wait time (in seconds). |
| 930 | * |
| 931 | * @access public |
| 932 | * |
| 933 | * @param int $seconds Enqueue wait time. |
| 934 | */ |
| 935 | public function set_enqueue_wait_time( $seconds ) { |
| 936 | $this->enqueue_wait_time = $seconds; |
| 937 | } |
| 938 | |
| 939 | /** |
| 940 | * Get current enqueue wait time (in seconds). |
| 941 | * |
| 942 | * @access public |
| 943 | * |
| 944 | * @return int Enqueue wait time. |
| 945 | */ |
| 946 | public function get_enqueue_wait_time() { |
| 947 | return $this->enqueue_wait_time; |
| 948 | } |
| 949 | |
| 950 | /** |
| 951 | * Set the sync wait threshold (in seconds). |
| 952 | * |
| 953 | * @access public |
| 954 | * |
| 955 | * @param int $seconds Sync wait threshold. |
| 956 | */ |
| 957 | public function set_sync_wait_threshold( $seconds ) { |
| 958 | $this->sync_wait_threshold = $seconds; |
| 959 | } |
| 960 | |
| 961 | /** |
| 962 | * Get current sync wait threshold (in seconds). |
| 963 | * |
| 964 | * @access public |
| 965 | * |
| 966 | * @return int Sync wait threshold. |
| 967 | */ |
| 968 | public function get_sync_wait_threshold() { |
| 969 | return $this->sync_wait_threshold; |
| 970 | } |
| 971 | |
| 972 | /** |
| 973 | * Set the maximum time for perfirming a checkout of items from the queue (in seconds). |
| 974 | * |
| 975 | * @access public |
| 976 | * |
| 977 | * @param int $seconds Maximum dequeue time. |
| 978 | */ |
| 979 | public function set_max_dequeue_time( $seconds ) { |
| 980 | $this->max_dequeue_time = $seconds; |
| 981 | } |
| 982 | |
| 983 | /** |
| 984 | * Initialize the sync queues, codec and set the default settings. |
| 985 | * |
| 986 | * @access public |
| 987 | */ |
| 988 | public function set_defaults() { |
| 989 | $this->sync_queue = new Queue( 'sync' ); |
| 990 | $this->full_sync_queue = new Queue( 'full_sync' ); |
| 991 | $this->set_codec(); |
| 992 | |
| 993 | // Saved settings. |
| 994 | Settings::set_importing( null ); |
| 995 | $settings = Settings::get_settings(); |
| 996 | $this->set_dequeue_max_bytes( $settings['dequeue_max_bytes'] ); |
| 997 | $this->set_upload_max_bytes( $settings['upload_max_bytes'] ); |
| 998 | $this->set_upload_max_rows( $settings['upload_max_rows'] ); |
| 999 | $this->set_sync_wait_time( $settings['sync_wait_time'] ); |
| 1000 | $this->set_enqueue_wait_time( $settings['enqueue_wait_time'] ); |
| 1001 | $this->set_sync_wait_threshold( $settings['sync_wait_threshold'] ); |
| 1002 | $this->set_max_dequeue_time( Defaults::get_max_sync_execution_time() ); |
| 1003 | } |
| 1004 | |
| 1005 | /** |
| 1006 | * Reset sync queues, modules and settings. |
| 1007 | * |
| 1008 | * @access public |
| 1009 | */ |
| 1010 | public function reset_data() { |
| 1011 | $this->reset_sync_queue(); |
| 1012 | $this->reset_full_sync_queue(); |
| 1013 | |
| 1014 | foreach ( Modules::get_modules() as $module ) { |
| 1015 | $module->reset_data(); |
| 1016 | } |
| 1017 | // Reset Sync locks without unlocking queues since we already reset those. |
| 1018 | Actions::reset_sync_locks( false ); |
| 1019 | |
| 1020 | Settings::reset_data(); |
| 1021 | } |
| 1022 | |
| 1023 | /** |
| 1024 | * Perform cleanup at the event of plugin uninstallation. |
| 1025 | * |
| 1026 | * @access public |
| 1027 | */ |
| 1028 | public function uninstall() { |
| 1029 | // Lets delete all the other fun stuff like transient and option and the sync queue. |
| 1030 | $this->reset_data(); |
| 1031 | |
| 1032 | // Delete the full sync status. |
| 1033 | delete_option( 'jetpack_full_sync_status' ); |
| 1034 | |
| 1035 | // Clear the sync cron. |
| 1036 | wp_clear_scheduled_hook( 'jetpack_sync_cron' ); |
| 1037 | wp_clear_scheduled_hook( 'jetpack_sync_full_cron' ); |
| 1038 | } |
| 1039 | } |