first commit
This commit is contained in:
608
wp-includes/collaboration/class-wp-http-polling-sync-server.php
Normal file
608
wp-includes/collaboration/class-wp-http-polling-sync-server.php
Normal file
@@ -0,0 +1,608 @@
|
||||
<?php
|
||||
/**
|
||||
* WP_HTTP_Polling_Sync_Server class
|
||||
*
|
||||
* @package WordPress
|
||||
*/
|
||||
|
||||
/**
|
||||
* Core class that contains an HTTP server used for collaborative editing.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @access private
|
||||
*/
|
||||
class WP_HTTP_Polling_Sync_Server {
|
||||
/**
|
||||
* REST API namespace.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var string
|
||||
*/
|
||||
const REST_NAMESPACE = 'wp-sync/v1';
|
||||
|
||||
/**
|
||||
* Awareness timeout in seconds. Clients that haven't updated
|
||||
* their awareness state within this time are considered disconnected.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var int
|
||||
*/
|
||||
const AWARENESS_TIMEOUT = 30;
|
||||
|
||||
/**
|
||||
* Threshold used to signal clients to send a compaction update.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var int
|
||||
*/
|
||||
const COMPACTION_THRESHOLD = 50;
|
||||
|
||||
/**
|
||||
* Maximum total size (in bytes) of the request body.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var int
|
||||
*/
|
||||
const MAX_BODY_SIZE = 16 * MB_IN_BYTES;
|
||||
|
||||
/**
|
||||
* Maximum number of rooms allowed per request.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var int
|
||||
*/
|
||||
const MAX_ROOMS_PER_REQUEST = 50;
|
||||
|
||||
/**
|
||||
* Maximum length of a single update data string.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var int
|
||||
*/
|
||||
const MAX_UPDATE_DATA_SIZE = MB_IN_BYTES;
|
||||
|
||||
/**
|
||||
* Sync update type: compaction.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var string
|
||||
*/
|
||||
const UPDATE_TYPE_COMPACTION = 'compaction';
|
||||
|
||||
/**
|
||||
* Sync update type: sync step 1.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var string
|
||||
*/
|
||||
const UPDATE_TYPE_SYNC_STEP1 = 'sync_step1';
|
||||
|
||||
/**
|
||||
* Sync update type: sync step 2.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var string
|
||||
*/
|
||||
const UPDATE_TYPE_SYNC_STEP2 = 'sync_step2';
|
||||
|
||||
/**
|
||||
* Sync update type: regular update.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var string
|
||||
*/
|
||||
const UPDATE_TYPE_UPDATE = 'update';
|
||||
|
||||
/**
|
||||
* Storage backend for sync updates.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*/
|
||||
private WP_Sync_Storage $storage;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param WP_Sync_Storage $storage Storage backend for sync updates.
|
||||
*/
|
||||
public function __construct( WP_Sync_Storage $storage ) {
|
||||
$this->storage = $storage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers REST API routes.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*/
|
||||
public function register_routes(): void {
|
||||
$typed_update_args = array(
|
||||
'properties' => array(
|
||||
'data' => array(
|
||||
'type' => 'string',
|
||||
'required' => true,
|
||||
'maxLength' => self::MAX_UPDATE_DATA_SIZE,
|
||||
),
|
||||
'type' => array(
|
||||
'type' => 'string',
|
||||
'required' => true,
|
||||
'enum' => array(
|
||||
self::UPDATE_TYPE_COMPACTION,
|
||||
self::UPDATE_TYPE_SYNC_STEP1,
|
||||
self::UPDATE_TYPE_SYNC_STEP2,
|
||||
self::UPDATE_TYPE_UPDATE,
|
||||
),
|
||||
),
|
||||
),
|
||||
'required' => true,
|
||||
'type' => 'object',
|
||||
);
|
||||
|
||||
$room_args = array(
|
||||
'after' => array(
|
||||
'minimum' => 0,
|
||||
'required' => true,
|
||||
'type' => 'integer',
|
||||
),
|
||||
'awareness' => array(
|
||||
'required' => true,
|
||||
'type' => array( 'object', 'null' ),
|
||||
),
|
||||
'client_id' => array(
|
||||
'minimum' => 1,
|
||||
'required' => true,
|
||||
'type' => 'integer',
|
||||
),
|
||||
'room' => array(
|
||||
'required' => true,
|
||||
'type' => 'string',
|
||||
'pattern' => '^[^/]+/[^/:]+(?::\\S+)?$',
|
||||
),
|
||||
'updates' => array(
|
||||
'items' => $typed_update_args,
|
||||
'minItems' => 0,
|
||||
'required' => true,
|
||||
'type' => 'array',
|
||||
),
|
||||
);
|
||||
|
||||
register_rest_route(
|
||||
self::REST_NAMESPACE,
|
||||
'/updates',
|
||||
array(
|
||||
'methods' => array( WP_REST_Server::CREATABLE ),
|
||||
'callback' => array( $this, 'handle_request' ),
|
||||
'permission_callback' => array( $this, 'check_permissions' ),
|
||||
'validate_callback' => array( $this, 'validate_request' ),
|
||||
'args' => array(
|
||||
'rooms' => array(
|
||||
'items' => array(
|
||||
'properties' => $room_args,
|
||||
'type' => 'object',
|
||||
),
|
||||
'maxItems' => self::MAX_ROOMS_PER_REQUEST,
|
||||
'required' => true,
|
||||
'type' => 'array',
|
||||
),
|
||||
),
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the current user has permission to access a room.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param WP_REST_Request $request The REST request.
|
||||
* @return bool|WP_Error True if user has permission, otherwise WP_Error with details.
|
||||
*/
|
||||
public function check_permissions( WP_REST_Request $request ) {
|
||||
// Minimum cap check. Is user logged in with a contributor role or higher?
|
||||
if ( ! current_user_can( 'edit_posts' ) ) {
|
||||
return new WP_Error(
|
||||
'rest_cannot_edit',
|
||||
__( 'You do not have permission to perform this action' ),
|
||||
array( 'status' => rest_authorization_required_code() )
|
||||
);
|
||||
}
|
||||
|
||||
$rooms = $request['rooms'];
|
||||
$wp_user_id = get_current_user_id();
|
||||
|
||||
foreach ( $rooms as $room ) {
|
||||
$client_id = $room['client_id'];
|
||||
$room = $room['room'];
|
||||
|
||||
// Check that the client_id is not already owned by another user.
|
||||
$existing_awareness = $this->storage->get_awareness_state( $room );
|
||||
foreach ( $existing_awareness as $entry ) {
|
||||
if ( $client_id === $entry['client_id'] && $wp_user_id !== $entry['wp_user_id'] ) {
|
||||
return new WP_Error(
|
||||
'rest_cannot_edit',
|
||||
__( 'Client ID is already in use by another user.' ),
|
||||
array( 'status' => rest_authorization_required_code() )
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
$type_parts = explode( '/', $room, 2 );
|
||||
$object_parts = explode( ':', $type_parts[1] ?? '', 2 );
|
||||
|
||||
$entity_kind = $type_parts[0];
|
||||
$entity_name = $object_parts[0];
|
||||
$object_id = $object_parts[1] ?? null;
|
||||
|
||||
if ( ! $this->can_user_sync_entity_type( $entity_kind, $entity_name, $object_id ) ) {
|
||||
return new WP_Error(
|
||||
'rest_cannot_edit',
|
||||
sprintf(
|
||||
/* translators: %s: The room name encodes the current entity being synced. */
|
||||
__( 'You do not have permission to sync this entity: %s.' ),
|
||||
$room
|
||||
),
|
||||
array( 'status' => rest_authorization_required_code() )
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that the request body does not exceed the maximum allowed size.
|
||||
*
|
||||
* Runs as the route-level validate_callback, after per-arg schema
|
||||
* validation has already passed.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param WP_REST_Request $request The REST request.
|
||||
* @return true|WP_Error True if valid, WP_Error if the body is too large.
|
||||
*/
|
||||
public function validate_request( WP_REST_Request $request ) {
|
||||
$body = $request->get_body();
|
||||
if ( is_string( $body ) && strlen( $body ) > self::MAX_BODY_SIZE ) {
|
||||
return new WP_Error(
|
||||
'rest_sync_body_too_large',
|
||||
__( 'Request body is too large.' ),
|
||||
array( 'status' => 413 )
|
||||
);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles request: stores sync updates and awareness data, and returns
|
||||
* updates the client is missing.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param WP_REST_Request $request The REST request.
|
||||
* @return WP_REST_Response|WP_Error Response object or error.
|
||||
*/
|
||||
public function handle_request( WP_REST_Request $request ) {
|
||||
$rooms = $request['rooms'];
|
||||
$response = array(
|
||||
'rooms' => array(),
|
||||
);
|
||||
|
||||
foreach ( $rooms as $room_request ) {
|
||||
$awareness = $room_request['awareness'];
|
||||
$client_id = $room_request['client_id'];
|
||||
$cursor = $room_request['after'];
|
||||
$room = $room_request['room'];
|
||||
|
||||
// Merge awareness state.
|
||||
$merged_awareness = $this->process_awareness_update( $room, $client_id, $awareness );
|
||||
|
||||
// The lowest client ID is nominated to perform compaction when needed.
|
||||
$is_compactor = false;
|
||||
if ( count( $merged_awareness ) > 0 ) {
|
||||
$is_compactor = min( array_keys( $merged_awareness ) ) === $client_id;
|
||||
}
|
||||
|
||||
// Process each update according to its type.
|
||||
foreach ( $room_request['updates'] as $update ) {
|
||||
$result = $this->process_sync_update( $room, $client_id, $cursor, $update );
|
||||
if ( is_wp_error( $result ) ) {
|
||||
return $result;
|
||||
}
|
||||
}
|
||||
|
||||
// Get updates for this client.
|
||||
$room_response = $this->get_updates( $room, $client_id, $cursor, $is_compactor );
|
||||
$room_response['awareness'] = $merged_awareness;
|
||||
|
||||
$response['rooms'][] = $room_response;
|
||||
}
|
||||
|
||||
return new WP_REST_Response( $response, 200 );
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the current user can sync a specific entity type.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $entity_kind The entity kind, e.g. 'postType', 'taxonomy', 'root'.
|
||||
* @param string $entity_name The entity name, e.g. 'post', 'category', 'site'.
|
||||
* @param string|null $object_id The numeric object ID / entity key for single entities, null for collections.
|
||||
* @return bool True if user has permission, otherwise false.
|
||||
*/
|
||||
private function can_user_sync_entity_type( string $entity_kind, string $entity_name, ?string $object_id ): bool {
|
||||
if ( is_string( $object_id ) ) {
|
||||
if ( ! ctype_digit( $object_id ) ) {
|
||||
return false;
|
||||
}
|
||||
$object_id = (int) $object_id;
|
||||
}
|
||||
if ( null !== $object_id && $object_id <= 0 ) {
|
||||
// Object ID must be numeric if provided.
|
||||
return false;
|
||||
}
|
||||
|
||||
// Validate permissions for the provided object ID.
|
||||
if ( is_int( $object_id ) ) {
|
||||
// Handle single post type entities with a defined object ID.
|
||||
if ( 'postType' === $entity_kind ) {
|
||||
if ( get_post_type( $object_id ) !== $entity_name ) {
|
||||
// Post is not of the specified post type.
|
||||
return false;
|
||||
}
|
||||
return current_user_can( 'edit_post', $object_id );
|
||||
}
|
||||
|
||||
// Handle single taxonomy term entities with a defined object ID.
|
||||
if ( 'taxonomy' === $entity_kind ) {
|
||||
$term_exists = term_exists( $object_id, $entity_name );
|
||||
if ( ! is_array( $term_exists ) || ! isset( $term_exists['term_id'] ) ) {
|
||||
// Either term doesn't exist OR term is not in specified taxonomy.
|
||||
return false;
|
||||
}
|
||||
|
||||
return current_user_can( 'edit_term', $object_id );
|
||||
}
|
||||
|
||||
// Handle single comment entities with a defined object ID.
|
||||
if ( 'root' === $entity_kind && 'comment' === $entity_name ) {
|
||||
return current_user_can( 'edit_comment', $object_id );
|
||||
}
|
||||
}
|
||||
|
||||
// All the remaining checks are for collections. If an object ID is provided,
|
||||
// reject the request.
|
||||
if ( null !== $object_id ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// For postType collections, check if the user can edit posts of this type.
|
||||
if ( 'postType' === $entity_kind ) {
|
||||
$post_type_object = get_post_type_object( $entity_name );
|
||||
if ( ! isset( $post_type_object->cap->edit_posts ) ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return current_user_can( $post_type_object->cap->edit_posts );
|
||||
}
|
||||
|
||||
// Collection syncing does not exchange entity data. It only signals if
|
||||
// another user has updated an entity in the collection. Therefore, we only
|
||||
// compare against an allow list of collection types.
|
||||
$allowed_collection_entity_kinds = array(
|
||||
'postType',
|
||||
'root',
|
||||
'taxonomy',
|
||||
);
|
||||
|
||||
return in_array( $entity_kind, $allowed_collection_entity_kinds, true );
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes and stores an awareness update from a client.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @param int $client_id Client identifier.
|
||||
* @param array<string, mixed>|null $awareness_update Awareness state sent by the client.
|
||||
* @return array<int, array<string, mixed>> Map of client ID to awareness state.
|
||||
*/
|
||||
private function process_awareness_update( string $room, int $client_id, ?array $awareness_update ): array {
|
||||
$existing_awareness = $this->storage->get_awareness_state( $room );
|
||||
$updated_awareness = array();
|
||||
$current_time = time();
|
||||
|
||||
foreach ( $existing_awareness as $entry ) {
|
||||
// Remove this client's entry (it will be updated below).
|
||||
if ( $client_id === $entry['client_id'] ) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Remove entries that have expired.
|
||||
if ( $current_time - $entry['updated_at'] >= self::AWARENESS_TIMEOUT ) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$updated_awareness[] = $entry;
|
||||
}
|
||||
|
||||
// Add this client's awareness state.
|
||||
if ( null !== $awareness_update ) {
|
||||
$updated_awareness[] = array(
|
||||
'client_id' => $client_id,
|
||||
'state' => $awareness_update,
|
||||
'updated_at' => $current_time,
|
||||
'wp_user_id' => get_current_user_id(),
|
||||
);
|
||||
}
|
||||
|
||||
// This action can fail, but it shouldn't fail the entire request.
|
||||
$this->storage->set_awareness_state( $room, $updated_awareness );
|
||||
|
||||
// Convert to client_id => state map for response.
|
||||
$response = array();
|
||||
foreach ( $updated_awareness as $entry ) {
|
||||
$response[ $entry['client_id'] ] = $entry['state'];
|
||||
}
|
||||
|
||||
return $response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a sync update based on its type.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @param int $client_id Client identifier.
|
||||
* @param int $cursor Client cursor (marker of last seen update).
|
||||
* @param array{data: string, type: string} $update Sync update.
|
||||
* @return true|WP_Error True on success, WP_Error on storage failure.
|
||||
*/
|
||||
private function process_sync_update( string $room, int $client_id, int $cursor, array $update ) {
|
||||
$data = $update['data'];
|
||||
$type = $update['type'];
|
||||
|
||||
switch ( $type ) {
|
||||
case self::UPDATE_TYPE_COMPACTION:
|
||||
/*
|
||||
* Compaction replaces updates the client has already seen. Only remove
|
||||
* updates with markers before the client's cursor to preserve updates
|
||||
* that arrived since the client's last sync.
|
||||
*
|
||||
* Check for a newer compaction update first. If one exists, skip this
|
||||
* compaction to avoid overwriting it.
|
||||
*/
|
||||
$updates_after_cursor = $this->storage->get_updates_after_cursor( $room, $cursor );
|
||||
$has_newer_compaction = false;
|
||||
|
||||
foreach ( $updates_after_cursor as $existing ) {
|
||||
if ( self::UPDATE_TYPE_COMPACTION === $existing['type'] ) {
|
||||
$has_newer_compaction = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ( ! $has_newer_compaction ) {
|
||||
if ( ! $this->storage->remove_updates_before_cursor( $room, $cursor ) ) {
|
||||
return new WP_Error(
|
||||
'rest_sync_storage_error',
|
||||
__( 'Failed to remove updates during compaction.' ),
|
||||
array( 'status' => 500 )
|
||||
);
|
||||
}
|
||||
|
||||
return $this->add_update( $room, $client_id, $type, $data );
|
||||
}
|
||||
|
||||
/*
|
||||
* A newer compaction already advanced the cursor, but we
|
||||
* can not safely drop an update. The incoming bytes still encode
|
||||
* operations other clients may not have seen, so store them as a
|
||||
* regular update. Y.applyUpdateV2 merges state-as-update blobs
|
||||
* idempotently, so overlap with the existing compaction is safe.
|
||||
*/
|
||||
return $this->add_update( $room, $client_id, self::UPDATE_TYPE_UPDATE, $data );
|
||||
|
||||
case self::UPDATE_TYPE_SYNC_STEP1:
|
||||
case self::UPDATE_TYPE_SYNC_STEP2:
|
||||
case self::UPDATE_TYPE_UPDATE:
|
||||
/*
|
||||
* Sync step 1 announces a client's state vector. Other clients need
|
||||
* to see it so they can respond with sync_step2 containing missing
|
||||
* updates. The cursor-based filtering prevents re-delivery.
|
||||
*
|
||||
* Sync step 2 contains updates for a specific client.
|
||||
*
|
||||
* All updates are stored persistently.
|
||||
*/
|
||||
return $this->add_update( $room, $client_id, $type, $data );
|
||||
}
|
||||
|
||||
return new WP_Error(
|
||||
'rest_invalid_update_type',
|
||||
__( 'Invalid sync update type.' ),
|
||||
array( 'status' => 400 )
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an update to a room's update list via storage.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @param int $client_id Client identifier.
|
||||
* @param string $type Update type (sync_step1, sync_step2, update, compaction).
|
||||
* @param string $data Base64-encoded update data.
|
||||
* @return true|WP_Error True on success, WP_Error on storage failure.
|
||||
*/
|
||||
private function add_update( string $room, int $client_id, string $type, string $data ) {
|
||||
$update = array(
|
||||
'client_id' => $client_id,
|
||||
'data' => $data,
|
||||
'type' => $type,
|
||||
);
|
||||
|
||||
if ( ! $this->storage->add_update( $room, $update ) ) {
|
||||
return new WP_Error(
|
||||
'rest_sync_storage_error',
|
||||
__( 'Failed to store sync update.' ),
|
||||
array( 'status' => 500 )
|
||||
);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets sync updates for a specific client from a room after a given cursor.
|
||||
*
|
||||
* Delegates cursor-based retrieval to the storage layer, then applies
|
||||
* client-specific filtering and compaction logic.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @param int $client_id Client identifier.
|
||||
* @param int $cursor Return updates after this cursor.
|
||||
* @param bool $is_compactor True if this client is nominated to perform compaction.
|
||||
* @return array{
|
||||
* end_cursor: int,
|
||||
* should_compact: bool,
|
||||
* room: string,
|
||||
* total_updates: int,
|
||||
* updates: array<int, array{data: string, type: string}>,
|
||||
* } Response data for this room.
|
||||
*/
|
||||
private function get_updates( string $room, int $client_id, int $cursor, bool $is_compactor ): array {
|
||||
$updates_after_cursor = $this->storage->get_updates_after_cursor( $room, $cursor );
|
||||
$total_updates = $this->storage->get_update_count( $room );
|
||||
|
||||
// Filter out this client's updates, except compaction updates.
|
||||
$typed_updates = array();
|
||||
foreach ( $updates_after_cursor as $update ) {
|
||||
if ( $client_id === $update['client_id'] && self::UPDATE_TYPE_COMPACTION !== $update['type'] ) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$typed_updates[] = array(
|
||||
'data' => $update['data'],
|
||||
'type' => $update['type'],
|
||||
);
|
||||
}
|
||||
|
||||
$should_compact = $is_compactor && $total_updates > self::COMPACTION_THRESHOLD;
|
||||
|
||||
return array(
|
||||
'end_cursor' => $this->storage->get_cursor( $room ),
|
||||
'room' => $room,
|
||||
'should_compact' => $should_compact,
|
||||
'total_updates' => $total_updates,
|
||||
'updates' => $typed_updates,
|
||||
);
|
||||
}
|
||||
}
|
||||
378
wp-includes/collaboration/class-wp-sync-post-meta-storage.php
Normal file
378
wp-includes/collaboration/class-wp-sync-post-meta-storage.php
Normal file
@@ -0,0 +1,378 @@
|
||||
<?php
|
||||
/**
|
||||
* WP_Sync_Post_Meta_Storage class
|
||||
*
|
||||
* @package WordPress
|
||||
*/
|
||||
|
||||
/**
|
||||
* Core class that provides an interface for storing and retrieving sync
|
||||
* updates and awareness data during a collaborative session.
|
||||
*
|
||||
* Data is stored as post meta on a dedicated post per room of a custom post type.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @access private
|
||||
*/
|
||||
class WP_Sync_Post_Meta_Storage implements WP_Sync_Storage {
|
||||
/**
|
||||
* Post type for sync storage.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var string
|
||||
*/
|
||||
const POST_TYPE = 'wp_sync_storage';
|
||||
|
||||
/**
|
||||
* Meta key for awareness state.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var string
|
||||
*/
|
||||
const AWARENESS_META_KEY = 'wp_sync_awareness_state';
|
||||
|
||||
/**
|
||||
* Meta key for sync updates.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var string
|
||||
*/
|
||||
const SYNC_UPDATE_META_KEY = 'wp_sync_update_data';
|
||||
|
||||
/**
|
||||
* Cache of cursors by room.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var array<string, int>
|
||||
*/
|
||||
private array $room_cursors = array();
|
||||
|
||||
/**
|
||||
* Cache of update counts by room.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var array<string, int>
|
||||
*/
|
||||
private array $room_update_counts = array();
|
||||
|
||||
/**
|
||||
* Cache of storage post IDs by room hash.
|
||||
*
|
||||
* @since 7.0.0
|
||||
* @var array<string, int>
|
||||
*/
|
||||
private static array $storage_post_ids = array();
|
||||
|
||||
/**
|
||||
* Adds a sync update to a given room.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @global wpdb $wpdb WordPress database abstraction object.
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @param mixed $update Sync update.
|
||||
* @return bool True on success, false on failure.
|
||||
*/
|
||||
public function add_update( string $room, $update ): bool {
|
||||
global $wpdb;
|
||||
|
||||
$post_id = $this->get_storage_post_id( $room );
|
||||
if ( null === $post_id ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Use direct database operation to avoid cache invalidation performed by
|
||||
// post meta functions (`wp_cache_set_posts_last_changed()` and direct
|
||||
// `wp_cache_delete()` calls).
|
||||
return (bool) $wpdb->insert(
|
||||
$wpdb->postmeta,
|
||||
array(
|
||||
'post_id' => $post_id,
|
||||
'meta_key' => self::SYNC_UPDATE_META_KEY,
|
||||
'meta_value' => wp_json_encode( $update ),
|
||||
),
|
||||
array( '%d', '%s', '%s' )
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets awareness state for a given room.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @global wpdb $wpdb WordPress database abstraction object.
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @return array<int, mixed> Awareness state.
|
||||
*/
|
||||
public function get_awareness_state( string $room ): array {
|
||||
global $wpdb;
|
||||
|
||||
$post_id = $this->get_storage_post_id( $room );
|
||||
if ( null === $post_id ) {
|
||||
return array();
|
||||
}
|
||||
|
||||
// Use direct database operation to avoid updating the post meta cache.
|
||||
// ORDER BY meta_id DESC ensures the latest row wins if duplicates exist
|
||||
// from a past race condition in set_awareness_state().
|
||||
$meta_value = $wpdb->get_var(
|
||||
$wpdb->prepare(
|
||||
"SELECT meta_value FROM $wpdb->postmeta WHERE post_id = %d AND meta_key = %s ORDER BY meta_id DESC LIMIT 1",
|
||||
$post_id,
|
||||
self::AWARENESS_META_KEY
|
||||
)
|
||||
);
|
||||
|
||||
if ( null === $meta_value ) {
|
||||
return array();
|
||||
}
|
||||
|
||||
$awareness = json_decode( $meta_value, true );
|
||||
|
||||
if ( ! is_array( $awareness ) ) {
|
||||
return array();
|
||||
}
|
||||
|
||||
return array_values( $awareness );
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets awareness state for a given room.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @global wpdb $wpdb WordPress database abstraction object.
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @param array<int, mixed> $awareness Serializable awareness state.
|
||||
* @return bool True on success, false on failure.
|
||||
*/
|
||||
public function set_awareness_state( string $room, array $awareness ): bool {
|
||||
global $wpdb;
|
||||
|
||||
$post_id = $this->get_storage_post_id( $room );
|
||||
if ( null === $post_id ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Use direct database operation to avoid cache invalidation performed by
|
||||
// post meta functions (`wp_cache_set_posts_last_changed()` and direct
|
||||
// `wp_cache_delete()` calls).
|
||||
//
|
||||
// If two concurrent requests both see no row and both INSERT, the
|
||||
// duplicate is harmless: get_awareness_state() reads the latest row
|
||||
// (ORDER BY meta_id DESC).
|
||||
$meta_id = $wpdb->get_var(
|
||||
$wpdb->prepare(
|
||||
"SELECT meta_id FROM $wpdb->postmeta WHERE post_id = %d AND meta_key = %s ORDER BY meta_id DESC LIMIT 1",
|
||||
$post_id,
|
||||
self::AWARENESS_META_KEY
|
||||
)
|
||||
);
|
||||
|
||||
if ( $meta_id ) {
|
||||
return (bool) $wpdb->update(
|
||||
$wpdb->postmeta,
|
||||
array( 'meta_value' => wp_json_encode( $awareness ) ),
|
||||
array( 'meta_id' => $meta_id ),
|
||||
array( '%s' ),
|
||||
array( '%d' )
|
||||
);
|
||||
}
|
||||
|
||||
return (bool) $wpdb->insert(
|
||||
$wpdb->postmeta,
|
||||
array(
|
||||
'post_id' => $post_id,
|
||||
'meta_key' => self::AWARENESS_META_KEY,
|
||||
'meta_value' => wp_json_encode( $awareness ),
|
||||
),
|
||||
array( '%d', '%s', '%s' )
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current cursor for a given room.
|
||||
*
|
||||
* The cursor is set during get_updates_after_cursor() and represents the
|
||||
* highest meta_id seen for the room's sync updates.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @return int Current cursor for the room.
|
||||
*/
|
||||
public function get_cursor( string $room ): int {
|
||||
return $this->room_cursors[ $room ] ?? 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets or creates the storage post for a given room.
|
||||
*
|
||||
* Each room gets its own dedicated post so that post meta cache
|
||||
* invalidation is scoped to a single room rather than all of them.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @return int|null Post ID.
|
||||
*/
|
||||
private function get_storage_post_id( string $room ): ?int {
|
||||
$room_hash = md5( $room );
|
||||
|
||||
if ( isset( self::$storage_post_ids[ $room_hash ] ) ) {
|
||||
return self::$storage_post_ids[ $room_hash ];
|
||||
}
|
||||
|
||||
// Try to find an existing post for this room.
|
||||
$posts = get_posts(
|
||||
array(
|
||||
'post_type' => self::POST_TYPE,
|
||||
'posts_per_page' => 1,
|
||||
'post_status' => 'publish',
|
||||
'name' => $room_hash,
|
||||
'fields' => 'ids',
|
||||
'orderby' => 'ID',
|
||||
'order' => 'ASC',
|
||||
)
|
||||
);
|
||||
|
||||
$post_id = array_first( $posts );
|
||||
if ( is_int( $post_id ) ) {
|
||||
self::$storage_post_ids[ $room_hash ] = $post_id;
|
||||
return $post_id;
|
||||
}
|
||||
|
||||
// Create new post for this room.
|
||||
$post_id = wp_insert_post(
|
||||
array(
|
||||
'post_type' => self::POST_TYPE,
|
||||
'post_status' => 'publish',
|
||||
'post_title' => 'Sync Storage',
|
||||
'post_name' => $room_hash,
|
||||
)
|
||||
);
|
||||
|
||||
if ( is_int( $post_id ) ) {
|
||||
self::$storage_post_ids[ $room_hash ] = $post_id;
|
||||
return $post_id;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the number of updates stored for a given room.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @return int Number of updates stored for the room.
|
||||
*/
|
||||
public function get_update_count( string $room ): int {
|
||||
return $this->room_update_counts[ $room ] ?? 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves sync updates from a room after the given cursor.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @global wpdb $wpdb WordPress database abstraction object.
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @param int $cursor Return updates after this cursor (meta_id).
|
||||
* @return array<int, mixed> Sync updates.
|
||||
*/
|
||||
public function get_updates_after_cursor( string $room, int $cursor ): array {
|
||||
global $wpdb;
|
||||
|
||||
$post_id = $this->get_storage_post_id( $room );
|
||||
if ( null === $post_id ) {
|
||||
$this->room_cursors[ $room ] = 0;
|
||||
$this->room_update_counts[ $room ] = 0;
|
||||
return array();
|
||||
}
|
||||
|
||||
// Capture the current room state first so the returned cursor is race-safe.
|
||||
$stats = $wpdb->get_row(
|
||||
$wpdb->prepare(
|
||||
"SELECT COUNT(*) AS total_updates, COALESCE( MAX(meta_id), 0 ) AS max_meta_id FROM {$wpdb->postmeta} WHERE post_id = %d AND meta_key = %s",
|
||||
$post_id,
|
||||
self::SYNC_UPDATE_META_KEY
|
||||
)
|
||||
);
|
||||
|
||||
$total_updates = $stats ? (int) $stats->total_updates : 0;
|
||||
$max_meta_id = $stats ? (int) $stats->max_meta_id : 0;
|
||||
|
||||
$this->room_update_counts[ $room ] = $total_updates;
|
||||
$this->room_cursors[ $room ] = $max_meta_id;
|
||||
|
||||
if ( $max_meta_id <= $cursor ) {
|
||||
return array();
|
||||
}
|
||||
|
||||
$rows = $wpdb->get_results(
|
||||
$wpdb->prepare(
|
||||
"SELECT meta_value FROM {$wpdb->postmeta} WHERE post_id = %d AND meta_key = %s AND meta_id > %d AND meta_id <= %d ORDER BY meta_id ASC",
|
||||
$post_id,
|
||||
self::SYNC_UPDATE_META_KEY,
|
||||
$cursor,
|
||||
$max_meta_id
|
||||
)
|
||||
);
|
||||
|
||||
if ( ! $rows ) {
|
||||
return array();
|
||||
}
|
||||
|
||||
$updates = array();
|
||||
foreach ( $rows as $row ) {
|
||||
$decoded = json_decode( $row->meta_value, true );
|
||||
if ( null !== $decoded ) {
|
||||
$updates[] = $decoded;
|
||||
}
|
||||
}
|
||||
|
||||
return $updates;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes updates from a room that are older than the given cursor.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @global wpdb $wpdb WordPress database abstraction object.
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @param int $cursor Remove updates with meta_id < this cursor.
|
||||
* @return bool True on success, false on failure.
|
||||
*/
|
||||
public function remove_updates_before_cursor( string $room, int $cursor ): bool {
|
||||
global $wpdb;
|
||||
|
||||
$post_id = $this->get_storage_post_id( $room );
|
||||
if ( null === $post_id ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$deleted_rows = $wpdb->query(
|
||||
$wpdb->prepare(
|
||||
"DELETE FROM {$wpdb->postmeta} WHERE post_id = %d AND meta_key = %s AND meta_id < %d",
|
||||
$post_id,
|
||||
self::SYNC_UPDATE_META_KEY,
|
||||
$cursor
|
||||
)
|
||||
);
|
||||
|
||||
if ( false === $deleted_rows ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
86
wp-includes/collaboration/interface-wp-sync-storage.php
Normal file
86
wp-includes/collaboration/interface-wp-sync-storage.php
Normal file
@@ -0,0 +1,86 @@
|
||||
<?php
|
||||
/**
|
||||
* WP_Sync_Storage interface
|
||||
*
|
||||
* @package WordPress
|
||||
*/
|
||||
|
||||
interface WP_Sync_Storage {
|
||||
/**
|
||||
* Adds a sync update to a given room.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @param mixed $update Serializable sync update, opaque to the storage implementation.
|
||||
* @return bool True on success, false on failure.
|
||||
*/
|
||||
public function add_update( string $room, $update ): bool;
|
||||
|
||||
/**
|
||||
* Gets awareness state for a given room.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @return array<int, mixed> Awareness state.
|
||||
*/
|
||||
public function get_awareness_state( string $room ): array;
|
||||
|
||||
/**
|
||||
* Gets the current cursor for a given room. This should return a monotonically
|
||||
* increasing integer that represents the last update that was returned for the
|
||||
* room during the current request. This allows clients to retrieve updates
|
||||
* after a specific cursor on subsequent requests.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @return int Current cursor for the room.
|
||||
*/
|
||||
public function get_cursor( string $room ): int;
|
||||
|
||||
/**
|
||||
* Gets the total number of stored updates for a given room.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @return int Total number of updates.
|
||||
*/
|
||||
public function get_update_count( string $room ): int;
|
||||
|
||||
/**
|
||||
* Retrieves sync updates from a room for a given client and cursor. Updates
|
||||
* from the specified client should be excluded.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @param int $cursor Return updates after this cursor.
|
||||
* @return array<int, mixed> Sync updates.
|
||||
*/
|
||||
public function get_updates_after_cursor( string $room, int $cursor ): array;
|
||||
|
||||
/**
|
||||
* Removes updates from a room that are older than the provided cursor.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @param int $cursor Remove updates with markers < this cursor.
|
||||
* @return bool True on success, false on failure.
|
||||
*/
|
||||
public function remove_updates_before_cursor( string $room, int $cursor ): bool;
|
||||
|
||||
/**
|
||||
* Sets awareness state for a given room.
|
||||
*
|
||||
* @since 7.0.0
|
||||
*
|
||||
* @param string $room Room identifier.
|
||||
* @param array<int, mixed> $awareness Serializable awareness state.
|
||||
* @return bool True on success, false on failure.
|
||||
*/
|
||||
public function set_awareness_state( string $room, array $awareness ): bool;
|
||||
}
|
||||
Reference in New Issue
Block a user