From 477c7cf1bf1d8b9b88f367f554242a9afb38a71e Mon Sep 17 00:00:00 2001 From: Kyriakos Z <3829551+koox00@users.noreply.github.com> Date: Tue, 12 Apr 2022 15:03:05 +0300 Subject: [PATCH] MM-40093 [Gekidou]: adds remote action to sync threads (#6135) * MM-40093: adds remote action to sync threads Syncing threads fetches all unread threads if there are no threads in the DB. If there are threads in the DB it fetches all threads newest than the newest one we have stored. Fetching happens in batches of 60 threads at a time. * Fixes querying for all threads list * Syncs threads on WS reconnection * Addresses review comments * Addresses review comments --- app/actions/remote/entry/common.ts | 18 ++++ app/actions/remote/thread.ts | 161 ++++++++++++++++++++++++++++- app/actions/websocket/index.ts | 17 +++ app/client/rest/threads.ts | 5 +- app/queries/servers/thread.ts | 45 +++++--- app/utils/thread/index.ts | 2 +- 6 files changed, 226 insertions(+), 22 deletions(-) diff --git a/app/actions/remote/entry/common.ts b/app/actions/remote/entry/common.ts index 4ff33d6d0d..f3b34e0f5e 100644 --- a/app/actions/remote/entry/common.ts +++ b/app/actions/remote/entry/common.ts @@ -18,6 +18,9 @@ import {queryAllChannelsForTeam} from '@queries/servers/channel'; import {getConfig} from '@queries/servers/system'; import {deleteMyTeams, getAvailableTeamIds, queryMyTeams, queryMyTeamsByIds, queryTeamsById} from '@queries/servers/team'; import {isDMorGM} from '@utils/channel'; +import {isCRTEnabled} from '@utils/thread'; + +import {fetchNewThreads} from '../thread'; import type ClientError from '@client/rest/error'; @@ -209,6 +212,21 @@ export async function deferredAppEntryActions( await fetchTeamsChannelsAndUnreadPosts(serverUrl, since, teamData.teams, teamData.memberships, initialTeamId); } + if (preferences && isCRTEnabled(preferences, config)) { + if (initialTeamId) { + await fetchNewThreads(serverUrl, initialTeamId, false); + } + + if (teamData.teams?.length) { + for await (const team of teamData.teams) { + if (team.id !== initialTeamId) { + // need to await here since GM/DM threads in different teams overlap + await fetchNewThreads(serverUrl, team.id, false); + } + } + } + } + fetchAllTeams(serverUrl); updateAllUsersSince(serverUrl, since); } diff --git a/app/actions/remote/thread.ts b/app/actions/remote/thread.ts index 7ceb3b5090..c12ae4e398 100644 --- a/app/actions/remote/thread.ts +++ b/app/actions/remote/thread.ts @@ -9,10 +9,14 @@ import NetworkManager from '@init/network_manager'; import {getChannelById} from '@queries/servers/channel'; import {getPostById} from '@queries/servers/post'; import {getCommonSystemValues} from '@queries/servers/system'; -import {getIsCRTEnabled, getThreadById} from '@queries/servers/thread'; +import {getIsCRTEnabled, getNewestThreadInTeam, getThreadById} from '@queries/servers/thread'; +import {getCurrentUser} from '@queries/servers/user'; import {forceLogoutIfNecessary} from './session'; +import type {Client} from '@client/rest'; +import type {Model} from '@nozbe/watermelondb'; + type FetchThreadsRequest = { error?: unknown; } | { @@ -26,7 +30,8 @@ type FetchThreadsOptions = { deleted?: boolean; unread?: boolean; since?: number; -} + totalsOnly?: boolean; +}; export const fetchAndSwitchToThread = async (serverUrl: string, rootId: string) => { const database = DatabaseManager.serverDatabases[serverUrl]?.database; @@ -67,7 +72,7 @@ export const fetchThreads = async ( perPage = General.CRT_CHUNK_SIZE, deleted = false, unread = false, - since = 0, + since, }: FetchThreadsOptions = { perPage: General.CRT_CHUNK_SIZE, deleted: false, @@ -90,7 +95,7 @@ export const fetchThreads = async ( try { const {config} = await getCommonSystemValues(database); - const data = await client.getThreads('me', teamId, before, after, perPage, deleted, unread, since, config.Version); + const data = await client.getThreads('me', teamId, before, after, perPage, deleted, unread, since, false, config.Version); const {threads} = data; @@ -194,3 +199,151 @@ export const updateThreadFollowing = async (serverUrl: string, teamId: string, t return {error}; } }; + +enum Direction { + Up, + Down, +} + +async function fetchBatchThreads( + serverUrl: string, + teamId: string, + options: FetchThreadsOptions, + pages?: number, +): Promise<{error: unknown; data?: Thread[]}> { + const operator = DatabaseManager.serverDatabases[serverUrl]?.operator; + + if (!operator) { + return {error: `${serverUrl} database not found`}; + } + + let client: Client; + try { + client = NetworkManager.getClient(serverUrl); + } catch (error) { + return {error}; + } + + // if we start from the begging of time (since = 0) we need to fetch threads from newest to oldest (Direction.Down) + // if there is another point in time, we need to fetch threads from oldest to newest (Direction.Up) + let direction = Direction.Up; + if (options.since === 0) { + direction = Direction.Down; + } + + const currentUser = await getCurrentUser(operator.database); + if (!currentUser) { + return {error: 'currentUser not found'}; + } + + const {config} = await getCommonSystemValues(operator.database); + const data: Thread[] = []; + + const fetchThreadsFunc = async (opts: FetchThreadsOptions) => { + let page = 0; + const {before, after, perPage = General.CRT_CHUNK_SIZE, deleted, unread, since} = opts; + + page += 1; + const {threads} = await client.getThreads(currentUser.id, teamId, before, after, perPage, deleted, unread, since, false, config.Version); + if (threads.length) { + // Mark all fetched threads as following + for (const thread of threads) { + thread.is_following = true; + } + + data.push(...threads); + + if (threads.length === perPage) { + const newOptions: FetchThreadsOptions = {perPage, deleted, unread}; + if (direction === Direction.Down) { + const last = threads[threads.length - 1]; + newOptions.before = last.id; + } else { + const first = threads[0]; + newOptions.after = first.id; + } + if (pages != null && page < pages) { + fetchThreadsFunc(newOptions); + } + } + } + }; + + try { + await fetchThreadsFunc(options); + } catch (error) { + if (__DEV__) { + throw error; + } + + return {error, data}; + } + + return {error: false, data}; +} + +export async function fetchNewThreads( + serverUrl: string, + teamId: string, + prepareRecordsOnly = false, +): Promise<{error: unknown; models?: Model[]}> { + const options: FetchThreadsOptions = { + unread: false, + deleted: true, + perPage: 60, + }; + + const operator = DatabaseManager.serverDatabases[serverUrl]?.operator; + + if (!operator) { + return {error: `${serverUrl} database not found`}; + } + + const newestThread = await getNewestThreadInTeam(operator.database, teamId, false); + options.since = newestThread ? newestThread.lastReplyAt : 0; + + let response = { + error: undefined, + data: [], + } as { + error: unknown; + data?: Thread[]; + }; + + let loadedInGlobalThreads = true; + + // if we have no threads in the DB fetch all unread ones + if (options.since === 0) { + // options to fetch all unread threads + options.deleted = false; + options.unread = true; + loadedInGlobalThreads = false; + } + + response = await fetchBatchThreads(serverUrl, teamId, options); + + const {error: nErr, data} = response; + + if (nErr) { + return {error: nErr}; + } + + if (!data?.length) { + return {error: false, models: []}; + } + + const {error, models} = await processReceivedThreads(serverUrl, data, teamId, true, loadedInGlobalThreads); + + if (!error && !prepareRecordsOnly && models?.length) { + try { + await operator.batchRecords(models); + } catch (err) { + if (__DEV__) { + throw err; + } + return {error: true}; + } + } + + return {error: false, models}; +} diff --git a/app/actions/websocket/index.ts b/app/actions/websocket/index.ts index 3c3b7d9d28..850c09a749 100644 --- a/app/actions/websocket/index.ts +++ b/app/actions/websocket/index.ts @@ -7,6 +7,7 @@ import {fetchPostsForUnreadChannels, fetchPostsSince} from '@actions/remote/post import {fetchRoles} from '@actions/remote/role'; import {fetchConfigAndLicense} from '@actions/remote/systems'; import {fetchAllTeams, fetchTeamsChannelsAndUnreadPosts} from '@actions/remote/team'; +import {fetchNewThreads} from '@actions/remote/thread'; import {fetchStatusByIds, updateAllUsersSince} from '@actions/remote/user'; import {WebsocketEvents} from '@constants'; import {SYSTEM_IDENTIFIERS} from '@constants/database'; @@ -17,6 +18,7 @@ import {prepareModels} from '@queries/servers/entry'; import {getCommonSystemValues, getConfig, getCurrentChannelId, getWebSocketLastDisconnected, resetWebSocketLastDisconnected, setCurrentTeamAndChannelId} from '@queries/servers/system'; import {isDMorGM} from '@utils/channel'; import {isTablet} from '@utils/helpers'; +import {isCRTEnabled} from '@utils/thread'; import {handleCategoryCreatedEvent, handleCategoryDeletedEvent, handleCategoryOrderUpdatedEvent, handleCategoryUpdatedEvent} from './category'; import {handleChannelConvertedEvent, handleChannelCreatedEvent, @@ -197,6 +199,21 @@ async function doReconnect(serverUrl: string) { await fetchTeamsChannelsAndUnreadPosts(serverUrl, lastDisconnectedAt, teamData.teams, teamData.memberships, initialTeamId); } + if (prefData.preferences && isCRTEnabled(prefData.preferences, config)) { + if (initialTeamId) { + await fetchNewThreads(serverUrl, initialTeamId, false); + } + + if (teamData.teams?.length) { + for await (const team of teamData.teams) { + if (team.id !== initialTeamId) { + // need to await here since GM/DM threads in different teams overlap + await fetchNewThreads(serverUrl, team.id, false); + } + } + } + } + fetchAllTeams(serverUrl); updateAllUsersSince(serverUrl, lastDisconnectedAt); diff --git a/app/client/rest/threads.ts b/app/client/rest/threads.ts index 87224923e6..a8ad6a7e32 100644 --- a/app/client/rest/threads.ts +++ b/app/client/rest/threads.ts @@ -6,7 +6,7 @@ import {buildQueryString, isMinimumServerVersion} from '@utils/helpers'; import {PER_PAGE_DEFAULT} from './constants'; export interface ClientThreadsMix { - getThreads: (userId: string, teamId: string, before?: string, after?: string, pageSize?: number, deleted?: boolean, unread?: boolean, since?: number, serverVersion?: string) => Promise; + getThreads: (userId: string, teamId: string, before?: string, after?: string, pageSize?: number, deleted?: boolean, unread?: boolean, since?: number, totalsOnly?: boolean, serverVersion?: string) => Promise; getThread: (userId: string, teamId: string, threadId: string, extended?: boolean) => Promise; updateTeamThreadsAsRead: (userId: string, teamId: string) => Promise; updateThreadRead: (userId: string, teamId: string, threadId: string, timestamp: number) => Promise; @@ -14,7 +14,7 @@ export interface ClientThreadsMix { } const ClientThreads = (superclass: any) => class extends superclass { - getThreads = async (userId: string, teamId: string, before = '', after = '', pageSize = PER_PAGE_DEFAULT, deleted = false, unread = false, since = 0, serverVersion = '') => { + getThreads = async (userId: string, teamId: string, before = '', after = '', pageSize = PER_PAGE_DEFAULT, deleted = false, unread = false, since = 0, totalsOnly = false, serverVersion = '') => { const queryStringObj: Record = { extended: 'true', before, @@ -22,6 +22,7 @@ const ClientThreads = (superclass: any) => class extends superclass { deleted, unread, since, + totalsOnly, }; if (serverVersion && isMinimumServerVersion(serverVersion, 6, 0)) { queryStringObj.per_page = pageSize; diff --git a/app/queries/servers/thread.ts b/app/queries/servers/thread.ts index dffc231677..23baea3a82 100644 --- a/app/queries/servers/thread.ts +++ b/app/queries/servers/thread.ts @@ -16,7 +16,7 @@ import type ServerDataOperator from '@database/operator/server_data_operator'; import type Model from '@nozbe/watermelondb/Model'; import type ThreadModel from '@typings/database/models/servers/thread'; -const {SERVER: {CHANNEL, POST, THREAD}} = MM_TABLES; +const {SERVER: {THREADS_IN_TEAM, THREAD, POST, CHANNEL}} = MM_TABLES; export const getIsCRTEnabled = async (database: Database): Promise => { const config = await getConfig(database); @@ -96,10 +96,8 @@ export const prepareThreadsFromReceivedPosts = async (operator: ServerDataOperat return models; }; -export const queryThreadsInTeam = (database: Database, teamId: string, onlyUnreads?: boolean, hasReplies?: boolean, isFollowing?: boolean, sort?: boolean): Query => { - const query: Q.Clause[] = [ - Q.experimentalNestedJoin(POST, CHANNEL), - ]; +export const queryThreadsInTeam = (database: Database, teamId: string, onlyUnreads?: boolean, hasReplies?: boolean, isFollowing?: boolean, sort?: boolean, limit?: number): Query => { + const query: Q.Clause[] = []; if (isFollowing) { query.push(Q.where('is_following', true)); @@ -117,22 +115,39 @@ export const queryThreadsInTeam = (database: Database, teamId: string, onlyUnrea query.push(Q.sortBy('last_reply_at', Q.desc)); } + let joinCondition: Q.Condition = Q.where('team_id', teamId); + + if (!onlyUnreads) { + joinCondition = Q.and( + Q.where('team_id', teamId), + Q.where('loaded_in_global_threads', true), + ); + } + query.push( - Q.on( - POST, - Q.on( - CHANNEL, - Q.or( - Q.where('team_id', teamId), - Q.where('team_id', ''), - ), - ), - ), + Q.on(THREADS_IN_TEAM, joinCondition), ); + if (limit) { + query.push(Q.take(limit)); + } + return database.get(THREAD).query(...query); }; +export async function getNewestThreadInTeam( + database: Database, + teamId: string, + unread: boolean, +): Promise { + try { + const threads = await queryThreadsInTeam(database, teamId, unread, true, true, true, 1).fetch(); + return threads?.[0] || undefined; + } catch (e) { + return undefined; + } +} + export function observeThreadMentionCount(database: Database, teamId?: string, includeDmGm?: boolean): Observable { return observeUnreadsAndMentionsInTeam(database, teamId, includeDmGm).pipe( switchMap(({mentions}) => of$(mentions)), diff --git a/app/utils/thread/index.ts b/app/utils/thread/index.ts index 142bb7a4c7..6946b73f7f 100644 --- a/app/utils/thread/index.ts +++ b/app/utils/thread/index.ts @@ -6,7 +6,7 @@ import {getPreferenceValue} from '@helpers/api/preference'; import type PreferenceModel from '@typings/database/models/servers/preference'; -export function isCRTEnabled(preferences: PreferenceModel[], config?: ClientConfig): boolean { +export function isCRTEnabled(preferences: PreferenceModel[]|PreferenceType[], config?: ClientConfig): boolean { let preferenceDefault = Preferences.COLLAPSED_REPLY_THREADS_OFF; const configValue = config?.CollapsedThreads; if (configValue === Config.DEFAULT_ON) {