Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
5.13% covered (danger)
5.13%
12 / 234
5.13% covered (danger)
5.13%
2 / 39
CRAP
0.00% covered (danger)
0.00%
0 / 1
Sender
5.13% covered (danger)
5.13%
12 / 234
5.13% covered (danger)
5.13%
2 / 39
10067.99
0.00% covered (danger)
0.00%
0 / 1
 get_instance
66.67% covered (warning)
66.67%
2 / 3
0.00% covered (danger)
0.00%
0 / 1
2.15
 __construct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 init
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 maybe_set_user_from_token
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 maybe_clear_user_from_token
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 get_next_sync_time
71.43% covered (warning)
71.43%
5 / 7
0.00% covered (danger)
0.00%
0 / 1
2.09
 set_next_sync_time
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 do_full_sync
0.00% covered (danger)
0.00%
0 / 17
0.00% covered (danger)
0.00%
0 / 1
132
 continue_full_sync_enqueue
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
20
 do_sync
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 do_dedicated_sync_and_exit
0.00% covered (danger)
0.00%
0 / 15
0.00% covered (danger)
0.00%
0 / 1
42
 do_sync_and_set_delays
0.00% covered (danger)
0.00%
0 / 28
0.00% covered (danger)
0.00%
0 / 1
210
 get_items_to_send
0.00% covered (danger)
0.00%
0 / 28
0.00% covered (danger)
0.00%
0 / 1
132
 fastcgi_finish_request
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 do_sync_for_queue
7.50% covered (danger)
7.50%
3 / 40
0.00% covered (danger)
0.00%
0 / 1
218.61
 send_action
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
6
 create_action_to_send
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
2
 sync_object
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 register_jetpack_xmlrpc_methods
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 get_sync_queue
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 get_full_sync_queue
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 get_codec
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 set_codec
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 send_checksum
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 reset_sync_queue
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 reset_full_sync_queue
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 set_dequeue_max_bytes
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 set_upload_max_bytes
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 set_upload_max_rows
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 set_sync_wait_time
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 get_sync_wait_time
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 set_enqueue_wait_time
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 get_enqueue_wait_time
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 set_sync_wait_threshold
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 get_sync_wait_threshold
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 set_max_dequeue_time
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 set_defaults
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
2
 reset_data
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 uninstall
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2/**
3 * Sync sender.
4 *
5 * @package automattic/jetpack-sync
6 */
7
8namespace Automattic\Jetpack\Sync;
9
10use Automattic\Jetpack\Connection\Manager;
11use Automattic\Jetpack\Constants;
12use WP_Error;
13
14/**
15 * This class grabs pending actions from the queue and sends them
16 */
17class Sender {
18    /**
19     * Name of the option that stores the time of the next sync.
20     *
21     * @access public
22     *
23     * @var string
24     */
25    const NEXT_SYNC_TIME_OPTION_NAME = 'jetpack_next_sync_time';
26
27    /**
28     * Name of the transient responsible for temprorarily disabling Sync sending during Pulls.
29     *
30     * @access public
31     *
32     * @var string
33     */
34    const TEMP_SYNC_DISABLE_TRANSIENT_NAME = 'jetpack_disable_sync_sending';
35
36    /**
37     * Expiry of the transient responsible for temprorarily disabling Sync sending during Pulls.
38     *
39     * @access public
40     *
41     * @var int
42     */
43    const TEMP_SYNC_DISABLE_TRANSIENT_EXPIRY = MINUTE_IN_SECONDS;
44
45    /**
46     * Sync timeout after a WPCOM error.
47     *
48     * @access public
49     *
50     * @var int
51     */
52    const WPCOM_ERROR_SYNC_DELAY = 60;
53
54    /**
55     * Sync timeout after a queue has been locked.
56     *
57     * @access public
58     *
59     * @var int
60     */
61    const QUEUE_LOCKED_SYNC_DELAY = 10;
62
63    /**
64     * Maximum bytes to checkout without exceeding the memory limit.
65     *
66     * @access private
67     *
68     * @var int
69     */
70    private $dequeue_max_bytes;
71
72    /**
73     * Maximum bytes in a single encoded item.
74     *
75     * @access private
76     *
77     * @var int
78     */
79    private $upload_max_bytes;
80
81    /**
82     * Maximum number of sync items in a single action.
83     *
84     * @access private
85     *
86     * @var int
87     */
88    private $upload_max_rows;
89
90    /**
91     * Maximum time for perfirming a checkout of items from the queue (in seconds).
92     *
93     * @access private
94     *
95     * @var int
96     */
97    private $max_dequeue_time;
98
99    /**
100     * How many seconds to wait after sending sync items after exceeding the sync wait threshold (in seconds).
101     *
102     * @access private
103     *
104     * @var int
105     */
106    private $sync_wait_time;
107
108    /**
109     * How much maximum time to wait for the checkout to finish (in seconds).
110     *
111     * @access private
112     *
113     * @var int
114     */
115    private $sync_wait_threshold;
116
117    /**
118     * How much maximum time to wait for the sync items to be queued for sending (in seconds).
119     *
120     * @access private
121     *
122     * @var int
123     */
124    private $enqueue_wait_time;
125
126    /**
127     * Incremental sync queue object.
128     *
129     * @access private
130     *
131     * @var \Automattic\Jetpack\Sync\Queue
132     */
133    private $sync_queue;
134
135    /**
136     * Full sync queue object.
137     *
138     * @access private
139     *
140     * @var \Automattic\Jetpack\Sync\Queue
141     */
142    private $full_sync_queue;
143
144    /**
145     * Codec object for encoding and decoding sync items.
146     *
147     * @access private
148     *
149     * @var \Automattic\Jetpack\Sync\Codec_Interface
150     */
151    private $codec;
152
153    /**
154     * The current user before we change or clear it.
155     *
156     * @access private
157     *
158     * @var \WP_User
159     */
160    private $old_user;
161
162    /**
163     * Container for the singleton instance of this class.
164     *
165     * @access private
166     * @static
167     *
168     * @var \Automattic\Jetpack\Sync\Sender
169     */
170    private static $instance;
171
172    /**
173     * Retrieve the singleton instance of this class.
174     *
175     * @access public
176     * @static
177     *
178     * @return Sender
179     */
180    public static function get_instance() {
181        if ( null === self::$instance ) {
182            self::$instance = new self();
183        }
184
185        return self::$instance;
186    }
187
188    /**
189     * Constructor.
190     * This is necessary because you can't use "new" when you declare instance properties >:(
191     *
192     * @access protected
193     * @static
194     */
195    protected function __construct() {
196        $this->set_defaults();
197        $this->init();
198    }
199
200    /**
201     * Initialize the sender.
202     * Prepares the current user and initializes all sync modules.
203     *
204     * @access private
205     */
206    private function init() {
207        add_action( 'jetpack_sync_before_send_queue_sync', array( $this, 'maybe_set_user_from_token' ), 1 );
208        add_action( 'jetpack_sync_before_send_queue_sync', array( $this, 'maybe_clear_user_from_token' ), 20 );
209        add_filter( 'jetpack_xmlrpc_unauthenticated_methods', array( $this, 'register_jetpack_xmlrpc_methods' ) );
210        foreach ( Modules::get_modules() as $module ) {
211            $module->init_before_send();
212        }
213    }
214
215    /**
216     * Detect if this is a XMLRPC request with a valid signature.
217     * If so, changes the user to the new one.
218     *
219     * @access public
220     */
221    public function maybe_set_user_from_token() {
222        $connection    = new Manager();
223        $verified_user = $connection->verify_xml_rpc_signature();
224        if ( Constants::is_true( 'XMLRPC_REQUEST' ) &&
225            ! is_wp_error( $verified_user )
226            && $verified_user
227        ) {
228            $old_user       = wp_get_current_user();
229            $this->old_user = $old_user->ID ?? 0;
230            wp_set_current_user( $verified_user['user_id'] );
231        }
232    }
233
234    /**
235     * If we used to have a previous current user, revert back to it.
236     *
237     * @access public
238     */
239    public function maybe_clear_user_from_token() {
240        if ( isset( $this->old_user ) ) {
241            wp_set_current_user( $this->old_user );
242        }
243    }
244
245    /**
246     * Retrieve the next sync time.
247     *
248     * Update @since 1.43.2
249     * Sometimes when we process Sync requests in Jetpack, the server clock can be a
250     * bit in the future and this can lock Sync to not send stuff for a while.
251     * We are introducing an extra check, to make sure to limit the next_sync_time
252     * to be at most one hour in the future from the current time.
253     *
254     * @access public
255     *
256     * @param string $queue_name Name of the queue.
257     * @return float Timestamp of the next sync.
258     */
259    public function get_next_sync_time( $queue_name ) {
260        $option_name    = self::NEXT_SYNC_TIME_OPTION_NAME . '_' . $queue_name;
261        $next_sync_time = (float) get_option( $option_name, 0 );
262
263        $is_more_than_one_hour = ( $next_sync_time - microtime( true ) ) >= HOUR_IN_SECONDS;
264
265        if ( $is_more_than_one_hour ) {
266            delete_option( $option_name );
267            $next_sync_time = 0;
268        }
269
270        return $next_sync_time;
271    }
272
273    /**
274     * Set the next sync time.
275     *
276     * @access public
277     *
278     * @param int    $time       Timestamp of the next sync.
279     * @param string $queue_name Name of the queue.
280     * @return boolean True if update was successful, false otherwise.
281     */
282    public function set_next_sync_time( $time, $queue_name ) {
283        return update_option( self::NEXT_SYNC_TIME_OPTION_NAME . '_' . $queue_name, $time, true );
284    }
285
286    /**
287     * Trigger a full sync.
288     *
289     * @access public
290     *
291     * @return boolean|WP_Error True if this sync sending was successful, error object otherwise.
292     */
293    public function do_full_sync() {
294        $sync_module = Modules::get_module( 'full-sync' );
295        '@phan-var Modules\Full_Sync_Immediately|Modules\Full_Sync $sync_module';
296        if ( ! $sync_module ) {
297            return;
298        }
299        // Full Sync Disabled.
300        if ( ! Settings::get_setting( 'full_sync_sender_enabled' ) ) {
301            return;
302        }
303
304        // Don't sync if request is marked as read only.
305        if ( Constants::is_true( 'JETPACK_SYNC_READ_ONLY' ) ) {
306            return new WP_Error( 'jetpack_sync_read_only' );
307        }
308
309        // Sync not started or Sync finished.
310        $status = $sync_module->get_status();
311        if ( false === $status['started'] || ( ! empty( $status['started'] ) && ! empty( $status['finished'] ) ) ) {
312            return false;
313        }
314
315        $this->continue_full_sync_enqueue();
316        // immediate full sync sends data in continue_full_sync_enqueue.
317        if ( ! $sync_module instanceof Modules\Full_Sync_Immediately ) {
318            return $this->do_sync_and_set_delays( $this->full_sync_queue );
319        } else {
320            $status = $sync_module->get_status();
321            // Sync not started or Sync finished.
322            if ( false === $status['started'] || ( ! empty( $status['started'] ) && ! empty( $status['finished'] ) ) ) {
323                return false;
324            } else {
325                return true;
326            }
327        }
328    }
329
330    /**
331     * Enqueue the next sync items for sending.
332     * Will not be done if the current request is a WP import one.
333     * Will be delayed until the next sync time comes.
334     *
335     * @access private
336     */
337    private function continue_full_sync_enqueue() {
338        if ( defined( 'WP_IMPORTING' ) && WP_IMPORTING ) {
339            return false;
340        }
341
342        if ( $this->get_next_sync_time( 'full-sync-enqueue' ) > microtime( true ) ) {
343            return false;
344        }
345
346        $full_sync_module = Modules::get_module( 'full-sync' );
347        '@phan-var Modules\Full_Sync_Immediately|Modules\Full_Sync $full_sync_module';
348        $full_sync_module->continue_enqueuing();
349
350        $this->set_next_sync_time( time() + $this->get_enqueue_wait_time(), 'full-sync-enqueue' );
351    }
352
353    /**
354     * Trigger incremental sync.
355     *
356     * @access public
357     *
358     * @return boolean|WP_Error True if this sync sending was successful, error object otherwise.
359     */
360    public function do_sync() {
361        // Sync directly during cron. We are doing this because otherwise
362        // the dedicated sync flow would be spawning HTTP requests during cron shutdown,
363        // which can be unreliable and cause sync lag for time-sensitive events like updates.
364        if ( ! Settings::is_dedicated_sync_enabled() || Settings::is_doing_cron() ) {
365            $result = $this->do_sync_and_set_delays( $this->sync_queue );
366        } else {
367            $result = Dedicated_Sender::spawn_sync( $this->sync_queue );
368        }
369
370        return $result;
371    }
372
373    /**
374     * Trigger incremental sync and early exit on Dedicated Sync request.
375     *
376     * @access public
377     *
378     * @param bool $do_real_exit If we should exit at the end of the request. We should by default.
379     *                           In the context of running this in the REST API, we actually want to return an error.
380     *
381     * @return void|WP_Error
382     */
383    public function do_dedicated_sync_and_exit( $do_real_exit = true ) {
384        nocache_headers();
385
386        if ( ! Settings::is_dedicated_sync_enabled() ) {
387            return new WP_Error( 'dedicated_sync_disabled', 'Dedicated Sync flow is disabled.' );
388        }
389
390        if ( ! Dedicated_Sender::is_dedicated_sync_request() ) {
391            return new WP_Error( 'non_dedicated_sync_request', 'Not a Dedicated Sync request.' );
392        }
393
394        /**
395         * Output an `OK` to show that Dedicated Sync is enabled and we can process events.
396         * This is used to test the feature is working.
397         *
398         * @see \Automattic\Jetpack\Sync\Dedicated_Sender::can_spawn_dedicated_sync_request
399         */
400        // phpcs:ignore WordPress.Security.EscapeOutput.OutputNotEscaped -- This is just a constant string used for Validation.
401        echo Dedicated_Sender::DEDICATED_SYNC_VALIDATION_STRING;
402
403        // Try to disconnect the request as quickly as possible and process things in the background.
404        $this->fastcgi_finish_request();
405
406        /**
407         * Close the PHP session to free up the server threads to handle other requests while we
408         * send sync data with Dedicated Sync.
409         *
410         * When we spawn Dedicated Sync, we send `$_COOKIES` with the request to help out with any
411         * firewall and/or caching functionality that might prevent us to ping the site directly.
412         *
413         * This will cause Dedicated Sync to reuse the visitor's PHP session and lock it until the
414         * request finishes, which can take anywhere from 1 to 30+ seconds, depending on the server
415         * `max_execution_time` configuration.
416         *
417         * By closing the session we're freeing up the session, so other requests can acquire the
418         * lock and proceed with their own tasks.
419         */
420        if ( session_status() === PHP_SESSION_ACTIVE ) {
421            session_write_close();
422        }
423
424        // Actually try to send Sync events.
425        $result = $this->do_sync_and_set_delays( $this->sync_queue );
426
427        // Output not used right now. Try to release dedicated sync lock
428        Dedicated_Sender::try_release_lock_spawn_request();
429
430        // If no errors occurred, re-spawn a dedicated Sync request.
431        if ( true === $result ) {
432            Dedicated_Sender::spawn_sync( $this->sync_queue );
433        }
434
435        if ( $do_real_exit ) {
436            exit( 0 );
437        }
438    }
439
440    /**
441     * Trigger sync for a certain sync queue.
442     * Responsible for setting next sync time.
443     * Will not be delayed if the current request is a WP import one.
444     * Will be delayed until the next sync time comes.
445     *
446     * @access public
447     *
448     * @param \Automattic\Jetpack\Sync\Queue $queue Queue object.
449     *
450     * @return boolean|WP_Error True if this sync sending was successful, error object otherwise.
451     */
452    public function do_sync_and_set_delays( $queue ) {
453        // Don't sync if importing.
454        if ( defined( 'WP_IMPORTING' ) && WP_IMPORTING ) {
455            return new WP_Error( 'is_importing' );
456        }
457
458        // Don't sync if request is marked as read only.
459        if ( Constants::is_true( 'JETPACK_SYNC_READ_ONLY' ) ) {
460            return new WP_Error( 'jetpack_sync_read_only' );
461        }
462
463        if ( ! Settings::is_sender_enabled( $queue->id ) ) {
464            return new WP_Error( 'sender_disabled_for_queue_' . $queue->id );
465        }
466
467        if ( get_transient( self::TEMP_SYNC_DISABLE_TRANSIENT_NAME ) ) {
468            return new WP_Error( 'sender_temporarily_disabled_while_pulling' );
469        }
470
471        // Return early if we've gotten a retry-after header response.
472        $retry_time = get_option( Actions::RETRY_AFTER_PREFIX . $queue->id );
473        if ( $retry_time ) {
474            // If expired update to false but don't send. Send will occurr in new request to avoid race conditions.
475            if ( microtime( true ) > $retry_time ) {
476                update_option( Actions::RETRY_AFTER_PREFIX . $queue->id, false, false );
477            }
478            return new WP_Error( 'retry_after' );
479        }
480
481        // Don't sync if we are throttled.
482        if ( $this->get_next_sync_time( $queue->id ) > microtime( true ) ) {
483            return new WP_Error( 'sync_throttled' );
484        }
485
486        $start_time = microtime( true );
487
488        Settings::set_is_syncing( true );
489
490        $sync_result = $this->do_sync_for_queue( $queue );
491
492        Settings::set_is_syncing( false );
493
494        $exceeded_sync_wait_threshold = ( microtime( true ) - $start_time ) > (float) $this->get_sync_wait_threshold();
495
496        if ( is_wp_error( $sync_result ) ) {
497            if ( 'unclosed_buffer' === $sync_result->get_error_code() ) {
498                $this->set_next_sync_time( time() + self::QUEUE_LOCKED_SYNC_DELAY, $queue->id );
499            }
500            if ( 'wpcom_error' === $sync_result->get_error_code() ) {
501                $this->set_next_sync_time( time() + self::WPCOM_ERROR_SYNC_DELAY, $queue->id );
502            }
503        } elseif ( $exceeded_sync_wait_threshold && ! Settings::is_doing_cron() ) {
504            // If a send was slow, briefly pause before the next one.
505            // Applies only to Dedicated/Normal Sync to avoid impacting user traffic;
506            // cron jobs are exempt.
507            $this->set_next_sync_time( time() + $this->get_sync_wait_time(), $queue->id );
508        }
509
510        return $sync_result;
511    }
512
513    /**
514     * Retrieve the next sync items to send.
515     *
516     * @access public
517     *
518     * @param (array|\Automattic\Jetpack\Sync\Queue_Buffer) $buffer_or_items Queue buffer or array of objects.
519     * @param boolean                                       $encode Whether to encode the items.
520     * @return array Sync items to send.
521     */
522    public function get_items_to_send( $buffer_or_items, $encode = true ) {
523        // Track how long we've been processing so we can avoid request timeouts.
524        $start_time    = microtime( true );
525        $upload_size   = 0;
526        $items_to_send = array();
527        $items         = is_array( $buffer_or_items ) ? $buffer_or_items : $buffer_or_items->get_items();
528        if ( ! is_array( $items ) ) {
529            $items = array();
530        }
531
532        // Set up current screen to avoid errors rendering content.
533        require_once ABSPATH . 'wp-admin/includes/class-wp-screen.php';
534        require_once ABSPATH . 'wp-admin/includes/screen.php';
535        set_current_screen( 'sync' );
536        $skipped_items_ids = array();
537        /**
538         * We estimate the total encoded size as we go by encoding each item individually.
539         * This is expensive, but the only way to really know :/
540         */
541        foreach ( $items as $key => $item ) {
542            if ( ! is_array( $item ) ) {
543                $skipped_items_ids[] = $key;
544                continue;
545            }
546
547            // Suspending cache addition help prevent overloading in memory cache of large sites.
548            wp_suspend_cache_addition( true );
549            /**
550             * Modify the data within an action before it is serialized and sent to the server
551             * For example, during full sync this expands Post ID's into full Post objects,
552             * so that we don't have to serialize the whole object into the queue.
553             *
554             * @since 1.6.3
555             * @since-jetpack 4.2.0
556             *
557             * @param array The action parameters
558             * @param int The ID of the user who triggered the action
559             */
560            $item[1] = apply_filters( 'jetpack_sync_before_send_' . $item[0], $item[1], $item[2] );
561            wp_suspend_cache_addition( false );
562            // Serialization usage can lead to empty, null or false action_name. Lets skip as there is no information to send.
563            if ( empty( $item[0] ) || false === $item[1] ) {
564                $skipped_items_ids[] = $key;
565                continue;
566            }
567            $encoded_item = $this->codec->encode( $item );
568            $upload_size += strlen( $encoded_item );
569            if ( $upload_size > $this->upload_max_bytes && array() !== $items_to_send ) {
570                break;
571            }
572            $items_to_send[ $key ] = $encode ? $encoded_item : $item;
573            if ( microtime( true ) - $start_time > $this->max_dequeue_time ) {
574                break;
575            }
576        }
577
578        return array( $items_to_send, $skipped_items_ids, $items, microtime( true ) - $start_time );
579    }
580
581    /**
582     * If supported, flush all response data to the client and finish the request.
583     * This allows for time consuming tasks to be performed without leaving the connection open.
584     *
585     * @access private
586     */
587    private function fastcgi_finish_request() {
588        if ( function_exists( 'fastcgi_finish_request' ) ) {
589            fastcgi_finish_request();
590        }
591    }
592
593    /**
594     * Perform sync for a certain sync queue.
595     *
596     * @access public
597     *
598     * @param \Automattic\Jetpack\Sync\Queue $queue Queue object.
599     *
600     * @return boolean|WP_Error True if this sync sending was successful, error object otherwise.
601     */
602    public function do_sync_for_queue( $queue ) {
603        do_action( 'jetpack_sync_before_send_queue_' . $queue->id );
604        if ( $queue->size() === 0 ) {
605            return new WP_Error( 'empty_queue_' . $queue->id );
606        }
607
608        /**
609         * Now that we're sure we are about to sync, try to ignore user abort
610         * so we can avoid getting into a bad state.
611         */
612        // https://plugins.trac.wordpress.org/ticket/2041
613        if ( function_exists( 'ignore_user_abort' ) ) {
614            ignore_user_abort( true );
615        }
616
617        /* Don't make the request block till we finish, if possible. */
618        if ( Constants::is_true( 'REST_REQUEST' ) || Constants::is_true( 'XMLRPC_REQUEST' ) ) {
619            $this->fastcgi_finish_request();
620        }
621
622        $checkout_start_time = microtime( true );
623
624        $buffer = $queue->checkout_with_memory_limit( $this->dequeue_max_bytes, $this->upload_max_rows );
625
626        if ( ! $buffer ) {
627            // Buffer has no items.
628            return new WP_Error( 'empty_buffer' );
629        }
630
631        if ( is_wp_error( $buffer ) ) {
632            return $buffer;
633        }
634
635        $checkout_duration = microtime( true ) - $checkout_start_time;
636
637        list( $items_to_send, $skipped_items_ids, $items, $preprocess_duration ) = $this->get_items_to_send( $buffer, true );
638        if ( ! empty( $items_to_send ) ) {
639            /**
640             * Fires when data is ready to send to the server.
641             * Return false or WP_Error to abort the sync (e.g. if there's an error)
642             * The items will be automatically re-sent later
643             *
644             * @since 1.6.3
645             * @since-jetpack 4.2.0
646             *
647             * @param array  $data The action buffer
648             * @param string $codec The codec name used to encode the data
649             * @param double $time The current time
650             * @param string $queue The queue used to send ('sync' or 'full_sync')
651             * @param float  $checkout_duration The duration of the checkout operation.
652             * @param float  $preprocess_duration The duration of the pre-process operation.
653             * @param int    $queue_size The size of the sync queue at the time of processing.
654             */
655            Settings::set_is_sending( true );
656            $processed_item_ids = apply_filters( 'jetpack_sync_send_data', $items_to_send, $this->codec->name(), microtime( true ), $queue->id, $checkout_duration, $preprocess_duration, $queue->size(), $buffer->id );
657            Settings::set_is_sending( false );
658        } else {
659            $processed_item_ids = $skipped_items_ids;
660            $skipped_items_ids  = array();
661        }
662
663        if ( 'non-blocking' !== $processed_item_ids ) {
664            if ( ! $processed_item_ids || is_wp_error( $processed_item_ids ) ) {
665                $checked_in_item_ids = $queue->checkin( $buffer );
666                if ( is_wp_error( $checked_in_item_ids ) ) {
667                    // phpcs:ignore WordPress.PHP.DevelopmentFunctions.error_log_error_log
668                    error_log( 'Error checking in buffer: ' . $checked_in_item_ids->get_error_message() );
669                    $queue->force_checkin();
670                }
671                if ( is_wp_error( $processed_item_ids ) ) {
672                    return new WP_Error( 'wpcom_error', $processed_item_ids->get_error_code() );
673                }
674
675                // Returning a wpcom_error is a sign to the caller that we should wait a while before syncing again.
676                return new WP_Error( 'wpcom_error', 'jetpack_sync_send_data_false' );
677            } else {
678                // Detect if the last item ID was an error.
679                $had_wp_error = is_wp_error( end( $processed_item_ids ) );
680                $wp_error     = $had_wp_error ? array_pop( $processed_item_ids ) : null;
681                // Also checkin any items that were skipped.
682                if ( array() !== $skipped_items_ids ) {
683                    $processed_item_ids = array_merge( $processed_item_ids, $skipped_items_ids );
684                }
685                $processed_items = array_intersect_key( $items, array_flip( $processed_item_ids ) );
686                /**
687                 * Allows us to keep track of all the actions that have been sent.
688                 * Allows us to calculate the progress of specific actions.
689                 *
690                 * @since 1.6.3
691                 * @since-jetpack 4.2.0
692                 *
693                 * @param array $processed_actions The actions that we send successfully.
694                 */
695                do_action( 'jetpack_sync_processed_actions', $processed_items );
696                $queue->close( $buffer, $processed_item_ids );
697                // Returning a WP_Error is a sign to the caller that we should wait a while before syncing again.
698                if ( $had_wp_error ) {
699                    return new WP_Error( 'wpcom_error', $wp_error->get_error_code() );
700                }
701            }
702        }
703
704        return true;
705    }
706
707    /**
708     * Immediately sends a single item without firing or enqueuing it
709     *
710     * @param string $action_name The action.
711     * @param array  $data The data associated with the action.
712     * @param string $key The key to use for the action.
713     *
714     * @return array Items processed. TODO: this doesn't make much sense anymore, it should probably be just a bool.
715     */
716    public function send_action( $action_name, $data = null, $key = null ) {
717        if ( ! Settings::is_sender_enabled( 'full_sync' ) ) {
718            return array();
719        }
720
721        // Compose the data to be sent.
722        $action_to_send = $this->create_action_to_send( $action_name, $data, $key );
723
724        list( $items_to_send, $skipped_items_ids, $items, $preprocess_duration ) = $this->get_items_to_send( $action_to_send, true ); // phpcs:ignore VariableAnalysis.CodeAnalysis.VariableAnalysis.UnusedVariable
725        Settings::set_is_sending( true );
726        $processed_item_ids = apply_filters( 'jetpack_sync_send_data', $items_to_send, $this->get_codec()->name(), microtime( true ), 'immediate-send', 0, $preprocess_duration );
727        Settings::set_is_sending( false );
728
729        /**
730         * Allows us to keep track of all the actions that have been sent.
731         * Allows us to calculate the progress of specific actions.
732         *
733         * @param array $processed_actions The actions that we send successfully.
734         *
735         * @since 1.6.3
736         * @since-jetpack 4.2.0
737         */
738        do_action( 'jetpack_sync_processed_actions', $action_to_send );
739
740        return $processed_item_ids;
741    }
742
743    /**
744     * Create an synthetic action for direct sending to WPCOM during full sync (for example)
745     *
746     * @access private
747     *
748     * @param string $action_name The action.
749     * @param array  $data The data associated with the action.
750     * @param string $key The key to use for the action.
751     * @return array An array of synthetic sync actions keyed by current microtime(true)
752     */
753    private function create_action_to_send( $action_name, $data, $key = null ) {
754        return array(
755            $key ?? (string) microtime( true ) => array(
756                $action_name,
757                $data,
758                get_current_user_id(),
759                microtime( true ),
760                Settings::is_importing(),
761            ),
762        );
763    }
764
765    /**
766     * Returns any object that is able to be synced.
767     *
768     * @access public
769     *
770     * @param array $args the synchronized object parameters.
771     * @return string Encoded sync object.
772     */
773    public function sync_object( $args ) {
774        // For example: posts, post, 5.
775        list( $module_name, $object_type, $id ) = $args;
776
777        $sync_module = Modules::get_module( $module_name );
778        $codec       = $this->get_codec();
779
780        return $codec->encode( $sync_module->get_object_by_id( $object_type, $id ) );
781    }
782
783    /**
784     * Register additional sync XML-RPC methods available to Jetpack for authenticated users.
785     *
786     * @access public
787     * @since 1.6.3
788     * @since-jetpack 7.8.0
789     *
790     * @param array $jetpack_methods XML-RPC methods available to the Jetpack Server.
791     * @return array Filtered XML-RPC methods.
792     */
793    public function register_jetpack_xmlrpc_methods( $jetpack_methods ) {
794        $jetpack_methods['jetpack.syncObject'] = array( $this, 'sync_object' );
795        return $jetpack_methods;
796    }
797
798    /**
799     * Get the incremental sync queue object.
800     *
801     * @access public
802     *
803     * @return \Automattic\Jetpack\Sync\Queue Queue object.
804     */
805    public function get_sync_queue() {
806        return $this->sync_queue;
807    }
808
809    /**
810     * Get the full sync queue object.
811     *
812     * @access public
813     *
814     * @return \Automattic\Jetpack\Sync\Queue Queue object.
815     */
816    public function get_full_sync_queue() {
817        return $this->full_sync_queue;
818    }
819
820    /**
821     * Get the codec object.
822     *
823     * @access public
824     *
825     * @return \Automattic\Jetpack\Sync\Codec_Interface Codec object.
826     */
827    public function get_codec() {
828        return $this->codec;
829    }
830
831    /**
832     * Determine the codec object.
833     * Use gzip deflate if supported.
834     *
835     * @access public
836     */
837    public function set_codec() {
838        if ( function_exists( 'gzinflate' ) ) {
839            $this->codec = new JSON_Deflate_Array_Codec();
840        } else {
841            $this->codec = new Simple_Codec();
842        }
843    }
844
845    /**
846     * Compute and send all the checksums.
847     *
848     * @access public
849     */
850    public function send_checksum() {
851        $store = new Replicastore();
852        do_action( 'jetpack_sync_checksum', $store->checksum_all() );
853    }
854
855    /**
856     * Reset the incremental sync queue.
857     *
858     * @access public
859     */
860    public function reset_sync_queue() {
861        $this->sync_queue->reset();
862    }
863
864    /**
865     * Reset the full sync queue.
866     *
867     * @access public
868     */
869    public function reset_full_sync_queue() {
870        $this->full_sync_queue->reset();
871    }
872
873    /**
874     * Set the maximum bytes to checkout without exceeding the memory limit.
875     *
876     * @access public
877     *
878     * @param int $size Maximum bytes to checkout.
879     */
880    public function set_dequeue_max_bytes( $size ) {
881        $this->dequeue_max_bytes = $size;
882    }
883
884    /**
885     * Set the maximum bytes in a single encoded item.
886     *
887     * @access public
888     *
889     * @param int $max_bytes Maximum bytes in a single encoded item.
890     */
891    public function set_upload_max_bytes( $max_bytes ) {
892        $this->upload_max_bytes = $max_bytes;
893    }
894
895    /**
896     * Set the maximum number of sync items in a single action.
897     *
898     * @access public
899     *
900     * @param int $max_rows Maximum number of sync items.
901     */
902    public function set_upload_max_rows( $max_rows ) {
903        $this->upload_max_rows = $max_rows;
904    }
905
906    /**
907     * Set the sync wait time (in seconds).
908     *
909     * @access public
910     *
911     * @param int $seconds Sync wait time.
912     */
913    public function set_sync_wait_time( $seconds ) {
914        $this->sync_wait_time = $seconds;
915    }
916
917    /**
918     * Get current sync wait time (in seconds).
919     *
920     * @access public
921     *
922     * @return int Sync wait time.
923     */
924    public function get_sync_wait_time() {
925        return $this->sync_wait_time;
926    }
927
928    /**
929     * Set the enqueue wait time (in seconds).
930     *
931     * @access public
932     *
933     * @param int $seconds Enqueue wait time.
934     */
935    public function set_enqueue_wait_time( $seconds ) {
936        $this->enqueue_wait_time = $seconds;
937    }
938
939    /**
940     * Get current enqueue wait time (in seconds).
941     *
942     * @access public
943     *
944     * @return int Enqueue wait time.
945     */
946    public function get_enqueue_wait_time() {
947        return $this->enqueue_wait_time;
948    }
949
950    /**
951     * Set the sync wait threshold (in seconds).
952     *
953     * @access public
954     *
955     * @param int $seconds Sync wait threshold.
956     */
957    public function set_sync_wait_threshold( $seconds ) {
958        $this->sync_wait_threshold = $seconds;
959    }
960
961    /**
962     * Get current sync wait threshold (in seconds).
963     *
964     * @access public
965     *
966     * @return int Sync wait threshold.
967     */
968    public function get_sync_wait_threshold() {
969        return $this->sync_wait_threshold;
970    }
971
972    /**
973     * Set the maximum time for perfirming a checkout of items from the queue (in seconds).
974     *
975     * @access public
976     *
977     * @param int $seconds Maximum dequeue time.
978     */
979    public function set_max_dequeue_time( $seconds ) {
980        $this->max_dequeue_time = $seconds;
981    }
982
983    /**
984     * Initialize the sync queues, codec and set the default settings.
985     *
986     * @access public
987     */
988    public function set_defaults() {
989        $this->sync_queue      = new Queue( 'sync' );
990        $this->full_sync_queue = new Queue( 'full_sync' );
991        $this->set_codec();
992
993        // Saved settings.
994        Settings::set_importing( null );
995        $settings = Settings::get_settings();
996        $this->set_dequeue_max_bytes( $settings['dequeue_max_bytes'] );
997        $this->set_upload_max_bytes( $settings['upload_max_bytes'] );
998        $this->set_upload_max_rows( $settings['upload_max_rows'] );
999        $this->set_sync_wait_time( $settings['sync_wait_time'] );
1000        $this->set_enqueue_wait_time( $settings['enqueue_wait_time'] );
1001        $this->set_sync_wait_threshold( $settings['sync_wait_threshold'] );
1002        $this->set_max_dequeue_time( Defaults::get_max_sync_execution_time() );
1003    }
1004
1005    /**
1006     * Reset sync queues, modules and settings.
1007     *
1008     * @access public
1009     */
1010    public function reset_data() {
1011        $this->reset_sync_queue();
1012        $this->reset_full_sync_queue();
1013
1014        foreach ( Modules::get_modules() as $module ) {
1015            $module->reset_data();
1016        }
1017        // Reset Sync locks without unlocking queues since we already reset those.
1018        Actions::reset_sync_locks( false );
1019
1020        Settings::reset_data();
1021    }
1022
1023    /**
1024     * Perform cleanup at the event of plugin uninstallation.
1025     *
1026     * @access public
1027     */
1028    public function uninstall() {
1029        // Lets delete all the other fun stuff like transient and option and the sync queue.
1030        $this->reset_data();
1031
1032        // Delete the full sync status.
1033        delete_option( 'jetpack_full_sync_status' );
1034
1035        // Clear the sync cron.
1036        wp_clear_scheduled_hook( 'jetpack_sync_cron' );
1037        wp_clear_scheduled_hook( 'jetpack_sync_full_cron' );
1038    }
1039}