diff --git a/app/actions/remote/entry/common.ts b/app/actions/remote/entry/common.ts index 4ff33d6d0d..f3b34e0f5e 100644 --- a/app/actions/remote/entry/common.ts +++ b/app/actions/remote/entry/common.ts @@ -18,6 +18,9 @@ import {queryAllChannelsForTeam} from '@queries/servers/channel'; import {getConfig} from '@queries/servers/system'; import {deleteMyTeams, getAvailableTeamIds, queryMyTeams, queryMyTeamsByIds, queryTeamsById} from '@queries/servers/team'; import {isDMorGM} from '@utils/channel'; +import {isCRTEnabled} from '@utils/thread'; + +import {fetchNewThreads} from '../thread'; import type ClientError from '@client/rest/error'; @@ -209,6 +212,21 @@ export async function deferredAppEntryActions( await fetchTeamsChannelsAndUnreadPosts(serverUrl, since, teamData.teams, teamData.memberships, initialTeamId); } + if (preferences && isCRTEnabled(preferences, config)) { + if (initialTeamId) { + await fetchNewThreads(serverUrl, initialTeamId, false); + } + + if (teamData.teams?.length) { + for await (const team of teamData.teams) { + if (team.id !== initialTeamId) { + // need to await here since GM/DM threads in different teams overlap + await fetchNewThreads(serverUrl, team.id, false); + } + } + } + } + fetchAllTeams(serverUrl); updateAllUsersSince(serverUrl, since); } diff --git a/app/actions/remote/thread.ts b/app/actions/remote/thread.ts index 7ceb3b5090..c12ae4e398 100644 --- a/app/actions/remote/thread.ts +++ b/app/actions/remote/thread.ts @@ -9,10 +9,14 @@ import NetworkManager from '@init/network_manager'; import {getChannelById} from '@queries/servers/channel'; import {getPostById} from '@queries/servers/post'; import {getCommonSystemValues} from '@queries/servers/system'; -import {getIsCRTEnabled, getThreadById} from '@queries/servers/thread'; +import {getIsCRTEnabled, getNewestThreadInTeam, getThreadById} from '@queries/servers/thread'; +import {getCurrentUser} from '@queries/servers/user'; import {forceLogoutIfNecessary} from './session'; +import type {Client} from '@client/rest'; +import type {Model} from '@nozbe/watermelondb'; + type FetchThreadsRequest = { error?: unknown; } | { @@ -26,7 +30,8 @@ type FetchThreadsOptions = { deleted?: boolean; unread?: boolean; since?: number; -} + totalsOnly?: boolean; +}; export const fetchAndSwitchToThread = async (serverUrl: string, rootId: string) => { const database = DatabaseManager.serverDatabases[serverUrl]?.database; @@ -67,7 +72,7 @@ export const fetchThreads = async ( perPage = General.CRT_CHUNK_SIZE, deleted = false, unread = false, - since = 0, + since, }: FetchThreadsOptions = { perPage: General.CRT_CHUNK_SIZE, deleted: false, @@ -90,7 +95,7 @@ export const fetchThreads = async ( try { const {config} = await getCommonSystemValues(database); - const data = await client.getThreads('me', teamId, before, after, perPage, deleted, unread, since, config.Version); + const data = await client.getThreads('me', teamId, before, after, perPage, deleted, unread, since, false, config.Version); const {threads} = data; @@ -194,3 +199,151 @@ export const updateThreadFollowing = async (serverUrl: string, teamId: string, t return {error}; } }; + +enum Direction { + Up, + Down, +} + +async function fetchBatchThreads( + serverUrl: string, + teamId: string, + options: FetchThreadsOptions, + pages?: number, +): Promise<{error: unknown; data?: Thread[]}> { + const operator = DatabaseManager.serverDatabases[serverUrl]?.operator; + + if (!operator) { + return {error: `${serverUrl} database not found`}; + } + + let client: Client; + try { + client = NetworkManager.getClient(serverUrl); + } catch (error) { + return {error}; + } + + // if we start from the begging of time (since = 0) we need to fetch threads from newest to oldest (Direction.Down) + // if there is another point in time, we need to fetch threads from oldest to newest (Direction.Up) + let direction = Direction.Up; + if (options.since === 0) { + direction = Direction.Down; + } + + const currentUser = await getCurrentUser(operator.database); + if (!currentUser) { + return {error: 'currentUser not found'}; + } + + const {config} = await getCommonSystemValues(operator.database); + const data: Thread[] = []; + + const fetchThreadsFunc = async (opts: FetchThreadsOptions) => { + let page = 0; + const {before, after, perPage = General.CRT_CHUNK_SIZE, deleted, unread, since} = opts; + + page += 1; + const {threads} = await client.getThreads(currentUser.id, teamId, before, after, perPage, deleted, unread, since, false, config.Version); + if (threads.length) { + // Mark all fetched threads as following + for (const thread of threads) { + thread.is_following = true; + } + + data.push(...threads); + + if (threads.length === perPage) { + const newOptions: FetchThreadsOptions = {perPage, deleted, unread}; + if (direction === Direction.Down) { + const last = threads[threads.length - 1]; + newOptions.before = last.id; + } else { + const first = threads[0]; + newOptions.after = first.id; + } + if (pages != null && page < pages) { + fetchThreadsFunc(newOptions); + } + } + } + }; + + try { + await fetchThreadsFunc(options); + } catch (error) { + if (__DEV__) { + throw error; + } + + return {error, data}; + } + + return {error: false, data}; +} + +export async function fetchNewThreads( + serverUrl: string, + teamId: string, + prepareRecordsOnly = false, +): Promise<{error: unknown; models?: Model[]}> { + const options: FetchThreadsOptions = { + unread: false, + deleted: true, + perPage: 60, + }; + + const operator = DatabaseManager.serverDatabases[serverUrl]?.operator; + + if (!operator) { + return {error: `${serverUrl} database not found`}; + } + + const newestThread = await getNewestThreadInTeam(operator.database, teamId, false); + options.since = newestThread ? newestThread.lastReplyAt : 0; + + let response = { + error: undefined, + data: [], + } as { + error: unknown; + data?: Thread[]; + }; + + let loadedInGlobalThreads = true; + + // if we have no threads in the DB fetch all unread ones + if (options.since === 0) { + // options to fetch all unread threads + options.deleted = false; + options.unread = true; + loadedInGlobalThreads = false; + } + + response = await fetchBatchThreads(serverUrl, teamId, options); + + const {error: nErr, data} = response; + + if (nErr) { + return {error: nErr}; + } + + if (!data?.length) { + return {error: false, models: []}; + } + + const {error, models} = await processReceivedThreads(serverUrl, data, teamId, true, loadedInGlobalThreads); + + if (!error && !prepareRecordsOnly && models?.length) { + try { + await operator.batchRecords(models); + } catch (err) { + if (__DEV__) { + throw err; + } + return {error: true}; + } + } + + return {error: false, models}; +} diff --git a/app/actions/websocket/index.ts b/app/actions/websocket/index.ts index 3c3b7d9d28..850c09a749 100644 --- a/app/actions/websocket/index.ts +++ b/app/actions/websocket/index.ts @@ -7,6 +7,7 @@ import {fetchPostsForUnreadChannels, fetchPostsSince} from '@actions/remote/post import {fetchRoles} from '@actions/remote/role'; import {fetchConfigAndLicense} from '@actions/remote/systems'; import {fetchAllTeams, fetchTeamsChannelsAndUnreadPosts} from '@actions/remote/team'; +import {fetchNewThreads} from '@actions/remote/thread'; import {fetchStatusByIds, updateAllUsersSince} from '@actions/remote/user'; import {WebsocketEvents} from '@constants'; import {SYSTEM_IDENTIFIERS} from '@constants/database'; @@ -17,6 +18,7 @@ import {prepareModels} from '@queries/servers/entry'; import {getCommonSystemValues, getConfig, getCurrentChannelId, getWebSocketLastDisconnected, resetWebSocketLastDisconnected, setCurrentTeamAndChannelId} from '@queries/servers/system'; import {isDMorGM} from '@utils/channel'; import {isTablet} from '@utils/helpers'; +import {isCRTEnabled} from '@utils/thread'; import {handleCategoryCreatedEvent, handleCategoryDeletedEvent, handleCategoryOrderUpdatedEvent, handleCategoryUpdatedEvent} from './category'; import {handleChannelConvertedEvent, handleChannelCreatedEvent, @@ -197,6 +199,21 @@ async function doReconnect(serverUrl: string) { await fetchTeamsChannelsAndUnreadPosts(serverUrl, lastDisconnectedAt, teamData.teams, teamData.memberships, initialTeamId); } + if (prefData.preferences && isCRTEnabled(prefData.preferences, config)) { + if (initialTeamId) { + await fetchNewThreads(serverUrl, initialTeamId, false); + } + + if (teamData.teams?.length) { + for await (const team of teamData.teams) { + if (team.id !== initialTeamId) { + // need to await here since GM/DM threads in different teams overlap + await fetchNewThreads(serverUrl, team.id, false); + } + } + } + } + fetchAllTeams(serverUrl); updateAllUsersSince(serverUrl, lastDisconnectedAt); diff --git a/app/client/rest/threads.ts b/app/client/rest/threads.ts index 87224923e6..a8ad6a7e32 100644 --- a/app/client/rest/threads.ts +++ b/app/client/rest/threads.ts @@ -6,7 +6,7 @@ import {buildQueryString, isMinimumServerVersion} from '@utils/helpers'; import {PER_PAGE_DEFAULT} from './constants'; export interface ClientThreadsMix { - getThreads: (userId: string, teamId: string, before?: string, after?: string, pageSize?: number, deleted?: boolean, unread?: boolean, since?: number, serverVersion?: string) => Promise; + getThreads: (userId: string, teamId: string, before?: string, after?: string, pageSize?: number, deleted?: boolean, unread?: boolean, since?: number, totalsOnly?: boolean, serverVersion?: string) => Promise; getThread: (userId: string, teamId: string, threadId: string, extended?: boolean) => Promise; updateTeamThreadsAsRead: (userId: string, teamId: string) => Promise; updateThreadRead: (userId: string, teamId: string, threadId: string, timestamp: number) => Promise; @@ -14,7 +14,7 @@ export interface ClientThreadsMix { } const ClientThreads = (superclass: any) => class extends superclass { - getThreads = async (userId: string, teamId: string, before = '', after = '', pageSize = PER_PAGE_DEFAULT, deleted = false, unread = false, since = 0, serverVersion = '') => { + getThreads = async (userId: string, teamId: string, before = '', after = '', pageSize = PER_PAGE_DEFAULT, deleted = false, unread = false, since = 0, totalsOnly = false, serverVersion = '') => { const queryStringObj: Record = { extended: 'true', before, @@ -22,6 +22,7 @@ const ClientThreads = (superclass: any) => class extends superclass { deleted, unread, since, + totalsOnly, }; if (serverVersion && isMinimumServerVersion(serverVersion, 6, 0)) { queryStringObj.per_page = pageSize; diff --git a/app/queries/servers/thread.ts b/app/queries/servers/thread.ts index dffc231677..23baea3a82 100644 --- a/app/queries/servers/thread.ts +++ b/app/queries/servers/thread.ts @@ -16,7 +16,7 @@ import type ServerDataOperator from '@database/operator/server_data_operator'; import type Model from '@nozbe/watermelondb/Model'; import type ThreadModel from '@typings/database/models/servers/thread'; -const {SERVER: {CHANNEL, POST, THREAD}} = MM_TABLES; +const {SERVER: {THREADS_IN_TEAM, THREAD, POST, CHANNEL}} = MM_TABLES; export const getIsCRTEnabled = async (database: Database): Promise => { const config = await getConfig(database); @@ -96,10 +96,8 @@ export const prepareThreadsFromReceivedPosts = async (operator: ServerDataOperat return models; }; -export const queryThreadsInTeam = (database: Database, teamId: string, onlyUnreads?: boolean, hasReplies?: boolean, isFollowing?: boolean, sort?: boolean): Query => { - const query: Q.Clause[] = [ - Q.experimentalNestedJoin(POST, CHANNEL), - ]; +export const queryThreadsInTeam = (database: Database, teamId: string, onlyUnreads?: boolean, hasReplies?: boolean, isFollowing?: boolean, sort?: boolean, limit?: number): Query => { + const query: Q.Clause[] = []; if (isFollowing) { query.push(Q.where('is_following', true)); @@ -117,22 +115,39 @@ export const queryThreadsInTeam = (database: Database, teamId: string, onlyUnrea query.push(Q.sortBy('last_reply_at', Q.desc)); } + let joinCondition: Q.Condition = Q.where('team_id', teamId); + + if (!onlyUnreads) { + joinCondition = Q.and( + Q.where('team_id', teamId), + Q.where('loaded_in_global_threads', true), + ); + } + query.push( - Q.on( - POST, - Q.on( - CHANNEL, - Q.or( - Q.where('team_id', teamId), - Q.where('team_id', ''), - ), - ), - ), + Q.on(THREADS_IN_TEAM, joinCondition), ); + if (limit) { + query.push(Q.take(limit)); + } + return database.get(THREAD).query(...query); }; +export async function getNewestThreadInTeam( + database: Database, + teamId: string, + unread: boolean, +): Promise { + try { + const threads = await queryThreadsInTeam(database, teamId, unread, true, true, true, 1).fetch(); + return threads?.[0] || undefined; + } catch (e) { + return undefined; + } +} + export function observeThreadMentionCount(database: Database, teamId?: string, includeDmGm?: boolean): Observable { return observeUnreadsAndMentionsInTeam(database, teamId, includeDmGm).pipe( switchMap(({mentions}) => of$(mentions)), diff --git a/app/utils/thread/index.ts b/app/utils/thread/index.ts index 142bb7a4c7..6946b73f7f 100644 --- a/app/utils/thread/index.ts +++ b/app/utils/thread/index.ts @@ -6,7 +6,7 @@ import {getPreferenceValue} from '@helpers/api/preference'; import type PreferenceModel from '@typings/database/models/servers/preference'; -export function isCRTEnabled(preferences: PreferenceModel[], config?: ClientConfig): boolean { +export function isCRTEnabled(preferences: PreferenceModel[]|PreferenceType[], config?: ClientConfig): boolean { let preferenceDefault = Preferences.COLLAPSED_REPLY_THREADS_OFF; const configValue = config?.CollapsedThreads; if (configValue === Config.DEFAULT_ON) {