[MM-47548 Gekidou] Data Retention Implementation (#6732)

* Fetch & Save granular data retention policies through REST

* Init Data cleanup

* Run the clean up

* Deleting posts in patches and running across other servers

* fetch on graphql & refactor

* Feedback changes

* Added try catch for deletePosts function

* Feedback changes

* Changed to 'for of' loop

* Misc

* app/actions

* Date cutoff fox

* Prevent showing loading bar when request fails

Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
Co-authored-by: Mattermost Build <build@mattermost.com>
This commit is contained in:
Anurag Shivarathri
2023-01-21 16:04:19 +05:30
committed by GitHub
parent 24ec88096c
commit eb46a6aeff
13 changed files with 381 additions and 34 deletions

View File

@@ -3,6 +3,7 @@
import {fetchPostAuthors} from '@actions/remote/post';
import {ActionType, Post} from '@constants';
import {MM_TABLES} from '@constants/database';
import DatabaseManager from '@database/manager';
import {getPostById, prepareDeletePost, queryPostsById} from '@queries/servers/post';
import {getCurrentUserId} from '@queries/servers/system';
@@ -18,6 +19,8 @@ import type MyChannelModel from '@typings/database/models/servers/my_channel';
import type PostModel from '@typings/database/models/servers/post';
import type UserModel from '@typings/database/models/servers/user';
const {SERVER: {DRAFT, FILE, POST, POSTS_IN_THREAD, REACTION, THREAD, THREAD_PARTICIPANT, THREADS_IN_TEAM}} = MM_TABLES;
export const sendAddToChannelEphemeralPost = async (serverUrl: string, user: UserModel, addedUsernames: string[], messages: string[], channeId: string, postRootId = '') => {
try {
const {operator} = DatabaseManager.getServerDatabaseAndOperator(serverUrl);
@@ -244,3 +247,33 @@ export async function getPosts(serverUrl: string, ids: string[]) {
return [];
}
}
export async function deletePosts(serverUrl: string, postIds: string[]) {
try {
const {database} = DatabaseManager.getServerDatabaseAndOperator(serverUrl);
const postsFormatted = `'${postIds.join("','")}'`;
await database.write(() => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-expect-error
return database.adapter.unsafeExecute({
sqls: [
[`DELETE FROM ${POST} where id IN (${postsFormatted})`, []],
[`DELETE FROM ${REACTION} where post_id IN (${postsFormatted})`, []],
[`DELETE FROM ${FILE} where post_id IN (${postsFormatted})`, []],
[`DELETE FROM ${DRAFT} where root_id IN (${postsFormatted})`, []],
[`DELETE FROM ${POSTS_IN_THREAD} where root_id IN (${postsFormatted})`, []],
[`DELETE FROM ${THREAD} where id IN (${postsFormatted})`, []],
[`DELETE FROM ${THREAD_PARTICIPANT} where thread_id IN (${postsFormatted})`, []],
[`DELETE FROM ${THREADS_IN_TEAM} where thread_id IN (${postsFormatted})`, []],
],
});
});
return {error: false};
} catch (error) {
return {error};
}
}

View File

@@ -1,14 +1,23 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
import {Q} from '@nozbe/watermelondb';
import deepEqual from 'deep-equal';
import {SYSTEM_IDENTIFIERS} from '@constants/database';
import {MM_TABLES, SYSTEM_IDENTIFIERS} from '@constants/database';
import DatabaseManager from '@database/manager';
import {getServerCredentials} from '@init/credentials';
import {getConfig, getLicense} from '@queries/servers/system';
import {queryAllChannelsForTeam} from '@queries/servers/channel';
import {getConfig, getLicense, getGlobalDataRetentionPolicy, getGranularDataRetentionPolicies, getLastGlobalDataRetentionRun, getIsDataRetentionEnabled} from '@queries/servers/system';
import {logError} from '@utils/log';
import {deletePosts} from './post';
import type {DataRetentionPoliciesRequest} from '@actions/remote/systems';
import type PostModel from '@typings/database/models/servers/post';
const {SERVER: {POST}} = MM_TABLES;
export async function storeConfigAndLicense(serverUrl: string, config: ClientConfig, license: ClientLicense) {
try {
// If we have credentials for this server then update the values in the database
@@ -74,6 +83,155 @@ export async function storeConfig(serverUrl: string, config: ClientConfig | unde
return [];
}
export async function storeDataRetentionPolicies(serverUrl: string, data: DataRetentionPoliciesRequest, prepareRecordsOnly = false) {
try {
const {globalPolicy, teamPolicies, channelPolicies} = data;
const {operator} = DatabaseManager.getServerDatabaseAndOperator(serverUrl);
const systems: IdValue[] = [{
id: SYSTEM_IDENTIFIERS.DATA_RETENTION_POLICIES,
value: globalPolicy || {},
}, {
id: SYSTEM_IDENTIFIERS.GRANULAR_DATA_RETENTION_POLICIES,
value: {
team: teamPolicies || [],
channel: channelPolicies || [],
},
}];
return operator.handleSystem({
systems,
prepareRecordsOnly,
});
} catch {
return [];
}
}
export async function updateLastDataRetentionRun(serverUrl: string, value?: number, prepareRecordsOnly = false) {
try {
const {operator} = DatabaseManager.getServerDatabaseAndOperator(serverUrl);
const systems: IdValue[] = [{
id: SYSTEM_IDENTIFIERS.LAST_DATA_RETENTION_RUN,
value: value || Date.now(),
}];
return operator.handleSystem({systems, prepareRecordsOnly});
} catch (error) {
logError('Failed updateLastDataRetentionRun', error);
return {error};
}
}
export async function dataRetentionCleanup(serverUrl: string) {
try {
const {database} = DatabaseManager.getServerDatabaseAndOperator(serverUrl);
const isDataRetentionEnabled = await getIsDataRetentionEnabled(database);
if (!isDataRetentionEnabled) {
return {error: undefined};
}
const lastRunAt = await getLastGlobalDataRetentionRun(database);
const lastCleanedToday = new Date(lastRunAt).toDateString() === new Date().toDateString();
// Do not run if clean up is already done today
if (lastRunAt && lastCleanedToday) {
return {error: undefined};
}
const globalPolicy = await getGlobalDataRetentionPolicy(database);
const granularPoliciesData = await getGranularDataRetentionPolicies(database);
// Get global data retention cutoff
let globalRetentionCutoff = 0;
if (globalPolicy?.message_deletion_enabled) {
globalRetentionCutoff = globalPolicy.message_retention_cutoff;
}
// Get Granular data retention policies
let teamPolicies: TeamDataRetentionPolicy[] = [];
let channelPolicies: ChannelDataRetentionPolicy[] = [];
if (granularPoliciesData) {
teamPolicies = granularPoliciesData.team;
channelPolicies = granularPoliciesData.channel;
}
const channelsCutoffs: {[key: string]: number} = {};
// Get channel level cutoff from team policies
for await (const teamPolicy of teamPolicies) {
const {team_id, post_duration} = teamPolicy;
const channelIds = await queryAllChannelsForTeam(database, team_id).fetchIds();
if (channelIds.length) {
const cutoff = getDataRetentionPolicyCutoff(post_duration);
channelIds.forEach((channelId) => {
channelsCutoffs[channelId] = cutoff;
});
}
}
// Get channel level cutoff from channel policies
channelPolicies.forEach(({channel_id, post_duration}) => {
channelsCutoffs[channel_id] = getDataRetentionPolicyCutoff(post_duration);
});
const conditions = [];
const channelIds = Object.keys(channelsCutoffs);
if (channelIds.length) {
// Fetch posts by channel level cutoff
for (const channelId of channelIds) {
const cutoff = channelsCutoffs[channelId];
conditions.push(`(channel_id='${channelId}' AND create_at < ${cutoff})`);
}
// Fetch posts by global cutoff which are not already fetched by channel level cutoff
conditions.push(`(channel_id NOT IN ('${channelIds.join("','")}') AND create_at < ${globalRetentionCutoff})`);
} else {
conditions.push(`create_at < ${globalRetentionCutoff}`);
}
const postIds = await database.get<PostModel>(POST).query(
Q.unsafeSqlQuery(`SELECT * FROM ${POST} where ${conditions.join(' OR ')}`),
).fetchIds();
if (postIds.length) {
const batchSize = 1000;
const deletePromises = [];
for (let i = 0; i < postIds.length; i += batchSize) {
const batch = postIds.slice(i, batchSize);
deletePromises.push(
deletePosts(serverUrl, batch),
);
}
const deleteResult = await Promise.all(deletePromises);
for (const {error} of deleteResult) {
if (error) {
return {error};
}
}
}
await updateLastDataRetentionRun(serverUrl);
return {error: undefined};
} catch (error) {
logError('An error occurred while performing data retention cleanup', error);
return {error};
}
}
// Returns cutoff time based on the policy's post_duration
function getDataRetentionPolicyCutoff(postDuration: number) {
const periodDate = new Date();
periodDate.setDate(periodDate.getDate() - postDuration);
periodDate.setHours(0);
periodDate.setMinutes(0);
periodDate.setSeconds(0);
return periodDate.getTime();
}
export async function setLastServerVersionCheck(serverUrl: string, reset = false) {
try {
const {operator} = DatabaseManager.getServerDatabaseAndOperator(serverUrl);

View File

@@ -1,7 +1,7 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
import {setLastServerVersionCheck} from '@actions/local/systems';
import {dataRetentionCleanup, setLastServerVersionCheck} from '@actions/local/systems';
import {fetchConfigAndLicense} from '@actions/remote/systems';
import DatabaseManager from '@database/manager';
import {prepareCommonSystemValues, getCurrentTeamId, getWebSocketLastDisconnected, getCurrentChannelId, getConfig, getLicense} from '@queries/servers/system';
@@ -27,6 +27,9 @@ export async function appEntry(serverUrl: string, since = 0, isUpgrade = false)
}
}
// Run data retention cleanup
await dataRetentionCleanup(serverUrl);
// clear lastUnreadChannelId
const removeLastUnreadChannelId = await prepareCommonSystemValues(operator, {lastUnreadChannelId: ''});
if (removeLastUnreadChannelId) {

View File

@@ -3,6 +3,7 @@
import {Database, Model} from '@nozbe/watermelondb';
import {dataRetentionCleanup} from '@actions/local/systems';
import {fetchMissingDirectChannelsInfo, fetchMyChannelsForTeam, handleKickFromChannel, MyChannelsRequest} from '@actions/remote/channel';
import {fetchGroupsForMember} from '@actions/remote/groups';
import {fetchPostsForUnreadChannels} from '@actions/remote/post';
@@ -378,7 +379,9 @@ export const syncOtherServers = async (serverUrl: string) => {
for (const server of servers) {
if (server.url !== serverUrl && server.lastActiveAt > 0) {
registerDeviceToken(server.url);
syncAllChannelMembersAndThreads(server.url);
syncAllChannelMembersAndThreads(server.url).then(() => {
dataRetentionCleanup(server.url);
});
autoUpdateTimezone(server.url);
}
}

View File

@@ -7,6 +7,7 @@ import {storeConfigAndLicense} from '@actions/local/systems';
import {MyChannelsRequest} from '@actions/remote/channel';
import {fetchGroupsForMember} from '@actions/remote/groups';
import {fetchPostsForUnreadChannels} from '@actions/remote/post';
import {fetchDataRetentionPolicy} from '@actions/remote/systems';
import {MyTeamsRequest, updateCanJoinTeams} from '@actions/remote/team';
import {syncTeamThreads} from '@actions/remote/thread';
import {autoUpdateTimezone, updateAllUsersSince} from '@actions/remote/user';
@@ -18,7 +19,7 @@ import {selectDefaultTeam} from '@helpers/api/team';
import {queryAllChannels, queryAllChannelsForTeam} from '@queries/servers/channel';
import {prepareModels, truncateCrtRelatedTables} from '@queries/servers/entry';
import {getHasCRTChanged} from '@queries/servers/preference';
import {getConfig} from '@queries/servers/system';
import {getConfig, getIsDataRetentionEnabled} from '@queries/servers/system';
import {filterAndTransformRoles, getMemberChannelsFromGQLQuery, getMemberTeamsFromGQLQuery, gqlToClientChannelMembership, gqlToClientPreference, gqlToClientSidebarCategory, gqlToClientTeamMembership, gqlToClientUser} from '@utils/graphql';
import {logDebug} from '@utils/log';
import {processIsCRTEnabled} from '@utils/thread';
@@ -265,6 +266,12 @@ export const entry = async (serverUrl: string, teamId?: string, channelId?: stri
result = entryRest(serverUrl, teamId, channelId, since);
}
// Fetch data retention policies
const isDataRetentionEnabled = await getIsDataRetentionEnabled(database);
if (isDataRetentionEnabled) {
fetchDataRetentionPolicy(serverUrl);
}
return result;
};

View File

@@ -444,7 +444,7 @@ export async function fetchPostsBefore(serverUrl: string, channelId: string, pos
} catch (error) {
forceLogoutIfNecessary(serverUrl, error as ClientErrorProps);
if (activeServerUrl === serverUrl) {
DeviceEventEmitter.emit(Events.LOADING_CHANNEL_POSTS, true);
DeviceEventEmitter.emit(Events.LOADING_CHANNEL_POSTS, false);
}
return {error};
}

View File

@@ -21,7 +21,6 @@ import {scheduleExpiredNotification} from '@utils/notification';
import {getCSRFFromCookie} from '@utils/security';
import {loginEntry} from './entry';
import {fetchDataRetentionPolicy} from './systems';
import type ClientError from '@client/rest/error';
import type {LoginArgs} from '@typings/database/database';
@@ -42,11 +41,6 @@ export const completeLogin = async (serverUrl: string) => {
return null;
}
// Data retention
if (config?.DataRetentionEnableMessageDeletion === 'true' && license?.IsLicensed === 'true' && license?.DataRetention === 'true') {
fetchDataRetentionPolicy(serverUrl);
}
await DatabaseManager.setActiveServerDatabase(serverUrl);
const systems: IdValue[] = [];

View File

@@ -1,12 +1,11 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
import {storeConfigAndLicense} from '@actions/local/systems';
import {storeConfigAndLicense, storeDataRetentionPolicies} from '@actions/local/systems';
import {forceLogoutIfNecessary} from '@actions/remote/session';
import {SYSTEM_IDENTIFIERS} from '@constants/database';
import DatabaseManager from '@database/manager';
import NetworkManager from '@managers/network_manager';
import {logError} from '@utils/log';
import {getCurrentUserId} from '@queries/servers/system';
import type ClientError from '@client/rest/error';
@@ -16,7 +15,47 @@ export type ConfigAndLicenseRequest = {
error?: unknown;
}
export const fetchDataRetentionPolicy = async (serverUrl: string) => {
export type DataRetentionPoliciesRequest = {
globalPolicy?: GlobalDataRetentionPolicy;
teamPolicies?: TeamDataRetentionPolicy[];
channelPolicies?: ChannelDataRetentionPolicy[];
error?: unknown;
}
export const fetchDataRetentionPolicy = async (serverUrl: string, fetchOnly = false): Promise<DataRetentionPoliciesRequest> => {
const operator = DatabaseManager.serverDatabases[serverUrl]?.operator;
if (!operator) {
return {error: `${serverUrl} database not found`};
}
try {
const {data: globalPolicy, error: globalPolicyError} = await fetchGlobalDataRetentionPolicy(serverUrl);
const {data: teamPolicies, error: teamPoliciesError} = await fetchAllGranularDataRetentionPolicies(serverUrl);
const {data: channelPolicies, error: channelPoliciesError} = await fetchAllGranularDataRetentionPolicies(serverUrl, true);
const hasError = globalPolicyError || teamPoliciesError || channelPoliciesError;
if (hasError) {
return hasError;
}
const data = {
globalPolicy,
teamPolicies: teamPolicies as TeamDataRetentionPolicy[],
channelPolicies: channelPolicies as ChannelDataRetentionPolicy[],
};
if (!fetchOnly) {
await storeDataRetentionPolicies(serverUrl, data);
}
return data;
} catch (error) {
forceLogoutIfNecessary(serverUrl, error as ClientError);
return {error};
}
};
export const fetchGlobalDataRetentionPolicy = async (serverUrl: string): Promise<{data?: GlobalDataRetentionPolicy; error?: unknown}> => {
let client;
try {
client = NetworkManager.getClient(serverUrl);
@@ -24,28 +63,47 @@ export const fetchDataRetentionPolicy = async (serverUrl: string) => {
return {error};
}
let data = {};
try {
data = await client.getDataRetentionPolicy();
const data = await client.getGlobalDataRetentionPolicy();
return {data};
} catch (error) {
forceLogoutIfNecessary(serverUrl, error as ClientError);
return {error};
}
};
const operator = DatabaseManager.serverDatabases[serverUrl]?.operator;
if (operator) {
const systems: IdValue[] = [{
id: SYSTEM_IDENTIFIERS.DATA_RETENTION_POLICIES,
value: JSON.stringify(data),
}];
operator.handleSystem({systems, prepareRecordsOnly: false}).
catch((error) => {
logError('An error occurred while saving data retention policies', error);
});
export const fetchAllGranularDataRetentionPolicies = async (
serverUrl: string,
isChannel = false,
page = 0,
policies: Array<TeamDataRetentionPolicy | ChannelDataRetentionPolicy> = [],
): Promise<{data?: Array<TeamDataRetentionPolicy | ChannelDataRetentionPolicy>; error?: unknown}> => {
let client;
try {
client = NetworkManager.getClient(serverUrl);
} catch (error) {
return {error};
}
return data;
const operator = DatabaseManager.serverDatabases[serverUrl]?.operator;
if (!operator) {
return {error: `${serverUrl} database not found`};
}
const {database} = operator;
const currentUserId = await getCurrentUserId(database);
let data;
if (isChannel) {
data = await client.getChannelDataRetentionPolicies(currentUserId, page);
} else {
data = await client.getTeamDataRetentionPolicies(currentUserId, page);
}
policies.push(...data.policies);
if (policies.length < data.total_count) {
await fetchAllGranularDataRetentionPolicies(serverUrl, isChannel, page + 1, policies);
}
return {data: policies};
};
export const fetchConfigAndLicense = async (serverUrl: string, fetchOnly = false): Promise<ConfigAndLicenseRequest> => {

View File

@@ -177,10 +177,14 @@ export default class ClientBase {
return `${this.getEmojisRoute()}/${emojiId}`;
}
getDataRetentionRoute() {
getGlobalDataRetentionRoute() {
return `${this.urlVersion}/data_retention`;
}
getGranularDataRetentionRoute(userId: string) {
return `${this.getUserRoute(userId)}/data_retention`;
}
getRolesRoute() {
return `${this.urlVersion}/roles`;
}

View File

@@ -3,8 +3,14 @@
import {buildQueryString} from '@utils/helpers';
import {PER_PAGE_DEFAULT} from './constants';
import ClientError from './error';
type PoliciesResponse<T> = {
policies: T[];
total_count: number;
}
export interface ClientGeneralMix {
getOpenGraphMetadata: (url: string) => Promise<any>;
ping: (deviceId?: string, timeoutInterval?: number) => Promise<any>;
@@ -12,7 +18,9 @@ export interface ClientGeneralMix {
getClientConfigOld: () => Promise<ClientConfig>;
getClientLicenseOld: () => Promise<ClientLicense>;
getTimezones: () => Promise<string[]>;
getDataRetentionPolicy: () => Promise<any>;
getGlobalDataRetentionPolicy: () => Promise<GlobalDataRetentionPolicy>;
getTeamDataRetentionPolicies: (userId: string, page?: number, perPage?: number) => Promise<PoliciesResponse<TeamDataRetentionPolicy>>;
getChannelDataRetentionPolicies: (userId: string, page?: number, perPage?: number) => Promise<PoliciesResponse<ChannelDataRetentionPolicy>>;
getRolesByNames: (rolesNames: string[]) => Promise<Role[]>;
getRedirectLocation: (urlParam: string) => Promise<Record<string, string>>;
}
@@ -74,9 +82,23 @@ const ClientGeneral = (superclass: any) => class extends superclass {
);
};
getDataRetentionPolicy = () => {
getGlobalDataRetentionPolicy = () => {
return this.doFetch(
`${this.getDataRetentionRoute()}/policy`,
`${this.getGlobalDataRetentionRoute()}/policy`,
{method: 'get'},
);
};
getTeamDataRetentionPolicies = (userId: string, page = 0, perPage = PER_PAGE_DEFAULT) => {
return this.doFetch(
`${this.getGranularDataRetentionRoute(userId)}/team_policies${buildQueryString({page, per_page: perPage})}`,
{method: 'get'},
);
};
getChannelDataRetentionPolicies = (userId: string, page = 0, perPage = PER_PAGE_DEFAULT) => {
return this.doFetch(
`${this.getGranularDataRetentionRoute(userId)}/channel_policies${buildQueryString({page, per_page: perPage})}`,
{method: 'get'},
);
};

View File

@@ -58,6 +58,8 @@ export const SYSTEM_IDENTIFIERS = {
CURRENT_USER_ID: 'currentUserId',
DATA_RETENTION_POLICIES: 'dataRetentionPolicies',
EXPANDED_LINKS: 'expandedLinks',
GRANULAR_DATA_RETENTION_POLICIES: 'granularDataRetentionPolicies',
LAST_DATA_RETENTION_RUN: 'lastDataRetentionRun',
GLOBAL_THREADS_TAB: 'globalThreadsTab',
LAST_DISMISSED_BANNER: 'lastDismissedBanner',
LAST_SERVER_VERSION_CHECK: 'LastServerVersionCheck',

View File

@@ -158,6 +158,50 @@ export const getConfigValue = async (database: Database, key: keyof ClientConfig
return list.length ? list[0].value : undefined;
};
export const getLastGlobalDataRetentionRun = async (database: Database) => {
try {
const data = await database.get<SystemModel>(SYSTEM).find(SYSTEM_IDENTIFIERS.LAST_DATA_RETENTION_RUN);
return data?.value || 0;
} catch {
return undefined;
}
};
export const getGlobalDataRetentionPolicy = async (database: Database) => {
try {
const data = await database.get<SystemModel>(SYSTEM).find(SYSTEM_IDENTIFIERS.DATA_RETENTION_POLICIES);
return (data?.value || {}) as GlobalDataRetentionPolicy;
} catch {
return undefined;
}
};
export const getGranularDataRetentionPolicies = async (database: Database) => {
try {
const data = await database.get<SystemModel>(SYSTEM).find(SYSTEM_IDENTIFIERS.GRANULAR_DATA_RETENTION_POLICIES);
return (data?.value || {
team: [],
channel: [],
}) as {
team: TeamDataRetentionPolicy[];
channel: ChannelDataRetentionPolicy[];
};
} catch {
return undefined;
}
};
export const getIsDataRetentionEnabled = async (database: Database) => {
const license = await getLicense(database);
if (!license || !Object.keys(license)?.length) {
return null;
}
const dataRetentionEnableMessageDeletion = await getConfigValue(database, 'DataRetentionEnableMessageDeletion');
return dataRetentionEnableMessageDeletion === 'true' && license?.IsLicensed === 'true' && license?.DataRetention === 'true';
};
export const observeConfig = (database: Database): Observable<ClientConfig | undefined> => {
return database.get<ConfigModel>(CONFIG).query().observeWithColumns(['value']).pipe(
switchMap((result) => of$(fromModelToClientConfig(result))),

19
types/global/data_retention.d.ts vendored Normal file
View File

@@ -0,0 +1,19 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
type GlobalDataRetentionPolicy = {
file_deletion_enabled: boolean;
file_retention_cutoff: number;
message_deletion_enabled: boolean;
message_retention_cutoff: number;
}
type TeamDataRetentionPolicy = {
post_duration: number;
team_id: string;
}
type ChannelDataRetentionPolicy = {
post_duration: number;
channel_id: string;
}