2017-04-20 22:40:52 +00:00
|
|
|
// Copyright 2017 Vector Creations Ltd
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2017-03-29 13:05:43 +00:00
|
|
|
package storage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"database/sql"
|
2017-05-12 15:56:17 +00:00
|
|
|
"encoding/json"
|
2017-03-29 13:05:43 +00:00
|
|
|
// Import the postgres database driver.
|
|
|
|
_ "github.com/lib/pq"
|
2017-05-12 15:56:17 +00:00
|
|
|
"github.com/matrix-org/dendrite/clientapi/events"
|
2017-03-30 14:29:23 +00:00
|
|
|
"github.com/matrix-org/dendrite/common"
|
2017-04-20 16:22:44 +00:00
|
|
|
"github.com/matrix-org/dendrite/syncapi/types"
|
2017-03-30 14:29:23 +00:00
|
|
|
"github.com/matrix-org/gomatrixserverlib"
|
2017-03-29 13:05:43 +00:00
|
|
|
)
|
|
|
|
|
2017-05-17 09:25:59 +00:00
|
|
|
type stateDelta struct {
|
|
|
|
roomID string
|
|
|
|
stateEvents []gomatrixserverlib.Event
|
|
|
|
membership string
|
2017-05-17 15:21:27 +00:00
|
|
|
// The stream position of the latest membership event for this user, if applicable.
|
|
|
|
// Can be 0 if there is no membership event in this delta.
|
|
|
|
membershipPos types.StreamPosition
|
|
|
|
}
|
|
|
|
|
|
|
|
// Same as gomatrixserverlib.Event but also has the stream position for this event.
|
|
|
|
type streamEvent struct {
|
|
|
|
gomatrixserverlib.Event
|
|
|
|
streamPosition types.StreamPosition
|
2017-05-17 09:25:59 +00:00
|
|
|
}
|
|
|
|
|
2017-03-29 13:05:43 +00:00
|
|
|
// SyncServerDatabase represents a sync server database
|
|
|
|
type SyncServerDatabase struct {
|
|
|
|
db *sql.DB
|
|
|
|
partitions common.PartitionOffsetStatements
|
2017-03-30 14:29:23 +00:00
|
|
|
events outputRoomEventsStatements
|
2017-04-05 09:30:13 +00:00
|
|
|
roomstate currentRoomStateStatements
|
2017-03-29 13:05:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewSyncServerDatabase creates a new sync server database
|
|
|
|
func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
|
|
|
|
var db *sql.DB
|
|
|
|
var err error
|
|
|
|
if db, err = sql.Open("postgres", dataSourceName); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
partitions := common.PartitionOffsetStatements{}
|
|
|
|
if err = partitions.Prepare(db); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2017-03-30 14:29:23 +00:00
|
|
|
events := outputRoomEventsStatements{}
|
|
|
|
if err = events.prepare(db); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2017-04-05 09:30:13 +00:00
|
|
|
state := currentRoomStateStatements{}
|
|
|
|
if err := state.prepare(db); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return &SyncServerDatabase{db, partitions, events, state}, nil
|
2017-03-30 14:29:23 +00:00
|
|
|
}
|
|
|
|
|
2017-05-17 14:38:24 +00:00
|
|
|
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
|
|
|
|
func (d *SyncServerDatabase) AllJoinedUsersInRooms() (map[string][]string, error) {
|
|
|
|
return d.roomstate.JoinedMemberLists()
|
|
|
|
}
|
|
|
|
|
2017-03-30 14:29:23 +00:00
|
|
|
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
|
2017-04-10 14:12:18 +00:00
|
|
|
// when generating the stream position for this event. Returns the sync stream position for the inserted event.
|
|
|
|
// Returns an error if there was a problem inserting this event.
|
2017-04-12 15:06:26 +00:00
|
|
|
func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos types.StreamPosition, returnErr error) {
|
2017-04-10 14:12:18 +00:00
|
|
|
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
|
|
|
var err error
|
2017-04-11 10:52:26 +00:00
|
|
|
pos, err := d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs)
|
2017-04-10 14:12:18 +00:00
|
|
|
if err != nil {
|
2017-04-05 09:30:13 +00:00
|
|
|
return err
|
|
|
|
}
|
2017-04-12 15:06:26 +00:00
|
|
|
streamPos = types.StreamPosition(pos)
|
2017-04-05 09:30:13 +00:00
|
|
|
|
|
|
|
if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 {
|
|
|
|
// Nothing to do, the event may have just been a message event.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Update the current room state based on the added/removed state event IDs.
|
|
|
|
// In the common case there is a single added event ID which is the state event itself, assuming `ev` is a state event.
|
|
|
|
// However, conflict resolution may result in there being different events being added, or even some removed.
|
|
|
|
if len(removeStateEventIDs) == 0 && len(addStateEventIDs) == 1 && addStateEventIDs[0] == ev.EventID() {
|
|
|
|
// common case
|
2017-04-10 14:12:18 +00:00
|
|
|
if err = d.roomstate.UpdateRoomState(txn, []gomatrixserverlib.Event{*ev}, nil); err != nil {
|
2017-04-05 09:30:13 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// uncommon case: we need to fetch the full event for each event ID mentioned, then update room state
|
|
|
|
added, err := d.events.Events(txn, addStateEventIDs)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-05-17 15:21:27 +00:00
|
|
|
return d.roomstate.UpdateRoomState(txn, streamEventsToEvents(added), removeStateEventIDs)
|
2017-04-05 09:30:13 +00:00
|
|
|
})
|
2017-04-10 14:12:18 +00:00
|
|
|
return
|
2017-03-29 13:05:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// PartitionOffsets implements common.PartitionStorer
|
|
|
|
func (d *SyncServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
|
|
|
|
return d.partitions.SelectPartitionOffsets(topic)
|
|
|
|
}
|
|
|
|
|
|
|
|
// SetPartitionOffset implements common.PartitionStorer
|
|
|
|
func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
|
|
|
return d.partitions.UpsertPartitionOffset(topic, partition, offset)
|
|
|
|
}
|
2017-04-05 09:30:13 +00:00
|
|
|
|
2017-04-10 14:12:18 +00:00
|
|
|
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
|
2017-04-12 15:06:26 +00:00
|
|
|
func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error) {
|
2017-04-13 15:56:46 +00:00
|
|
|
id, err := d.events.MaxID(nil)
|
2017-04-11 10:52:26 +00:00
|
|
|
if err != nil {
|
2017-04-12 15:06:26 +00:00
|
|
|
return types.StreamPosition(0), err
|
2017-04-11 10:52:26 +00:00
|
|
|
}
|
2017-04-12 15:06:26 +00:00
|
|
|
return types.StreamPosition(id), nil
|
2017-04-10 14:12:18 +00:00
|
|
|
}
|
|
|
|
|
2017-04-19 15:04:01 +00:00
|
|
|
// IncrementalSync returns all the data needed in order to create an incremental sync response.
|
2017-05-15 14:18:08 +00:00
|
|
|
func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) {
|
2017-04-19 15:04:01 +00:00
|
|
|
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
2017-05-17 09:25:59 +00:00
|
|
|
// Work out which rooms to return in the response. This is done by getting not only the currently
|
|
|
|
// joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions.
|
|
|
|
// This works out what the 'state' key should be for each room as well as which membership block
|
|
|
|
// to put the room into.
|
|
|
|
deltas, err := d.getStateDeltas(txn, fromPos, toPos, userID)
|
2017-04-19 15:04:01 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-05-15 14:18:08 +00:00
|
|
|
res = types.NewResponse(toPos)
|
2017-05-17 09:25:59 +00:00
|
|
|
for _, delta := range deltas {
|
2017-05-17 15:21:27 +00:00
|
|
|
endPos := toPos
|
|
|
|
if delta.membershipPos > 0 && delta.membership == "leave" {
|
|
|
|
// make sure we don't leak recent events after the leave event.
|
|
|
|
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
|
|
|
|
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
|
|
|
|
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
|
|
|
|
// in a single /sync request
|
|
|
|
// This is all "okay" assuming history_visibility == "shared" which it is by default.
|
|
|
|
endPos = delta.membershipPos
|
|
|
|
}
|
|
|
|
recentStreamEvents, err := d.events.RecentEventsInRoom(txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom)
|
2017-04-19 15:04:01 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-05-17 15:21:27 +00:00
|
|
|
recentEvents := streamEventsToEvents(recentStreamEvents)
|
2017-05-17 09:25:59 +00:00
|
|
|
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
|
2017-05-15 14:18:08 +00:00
|
|
|
|
2017-05-17 09:25:59 +00:00
|
|
|
switch delta.membership {
|
|
|
|
case "join":
|
|
|
|
jr := types.NewJoinResponse()
|
|
|
|
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
|
|
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
|
|
|
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
|
|
|
res.Rooms.Join[delta.roomID] = *jr
|
|
|
|
case "leave":
|
|
|
|
fallthrough // transitions to leave are the same as ban
|
|
|
|
case "ban":
|
|
|
|
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
|
|
|
// no longer in the room.
|
|
|
|
lr := types.NewLeaveResponse()
|
|
|
|
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
|
|
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
|
|
|
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
|
|
|
res.Rooms.Leave[delta.roomID] = *lr
|
|
|
|
}
|
2017-04-19 15:04:01 +00:00
|
|
|
}
|
2017-05-15 16:41:54 +00:00
|
|
|
|
2017-05-17 09:25:59 +00:00
|
|
|
// TODO: This should be done in getStateDeltas
|
2017-05-15 16:41:54 +00:00
|
|
|
return d.addInvitesToResponse(txn, userID, res)
|
2017-04-19 15:04:01 +00:00
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-05-15 14:18:08 +00:00
|
|
|
// CompleteSync a complete /sync API response for the given user.
|
|
|
|
func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (res *types.Response, returnErr error) {
|
2017-04-13 15:56:46 +00:00
|
|
|
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
|
|
|
|
// a consistent view of the database throughout. This includes extracting the sync stream position.
|
2017-05-15 14:18:08 +00:00
|
|
|
// This does have the unfortunate side-effect that all the matrixy logic resides in this function,
|
|
|
|
// but it's better to not hide the fact that this is being done in a transaction.
|
2017-04-13 15:56:46 +00:00
|
|
|
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
|
|
|
// Get the current stream position which we will base the sync response on.
|
|
|
|
id, err := d.events.MaxID(txn)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-05-15 14:18:08 +00:00
|
|
|
pos := types.StreamPosition(id)
|
2017-04-13 15:56:46 +00:00
|
|
|
|
|
|
|
// Extract room state and recent events for all rooms the user is joined to.
|
|
|
|
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-05-15 14:18:08 +00:00
|
|
|
|
2017-05-15 16:41:54 +00:00
|
|
|
// Build up a /sync response. Add joined rooms.
|
2017-05-15 14:18:08 +00:00
|
|
|
res = types.NewResponse(pos)
|
2017-04-13 15:56:46 +00:00
|
|
|
for _, roomID := range roomIDs {
|
|
|
|
stateEvents, err := d.roomstate.CurrentState(txn, roomID)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// TODO: When filters are added, we may need to call this multiple times to get enough events.
|
|
|
|
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
|
2017-05-17 15:21:27 +00:00
|
|
|
recentStreamEvents, err := d.events.RecentEventsInRoom(txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom)
|
2017-04-13 15:56:46 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-05-17 15:21:27 +00:00
|
|
|
recentEvents := streamEventsToEvents(recentStreamEvents)
|
2017-05-10 16:48:35 +00:00
|
|
|
|
2017-05-11 14:51:35 +00:00
|
|
|
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
2017-05-15 14:18:08 +00:00
|
|
|
jr := types.NewJoinResponse()
|
|
|
|
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
|
|
jr.Timeline.Limited = true
|
|
|
|
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
|
|
|
|
res.Rooms.Join[roomID] = *jr
|
2017-04-13 15:56:46 +00:00
|
|
|
}
|
2017-05-15 16:41:54 +00:00
|
|
|
|
|
|
|
return d.addInvitesToResponse(txn, userID, res)
|
2017-04-13 15:56:46 +00:00
|
|
|
})
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-05-15 16:41:54 +00:00
|
|
|
func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, res *types.Response) error {
|
|
|
|
// Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark.
|
|
|
|
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "invite")
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
for _, roomID := range roomIDs {
|
|
|
|
ir := types.NewInviteResponse()
|
|
|
|
// TODO: invite_state. The state won't be in the current state table in cases where you get invited over federation
|
|
|
|
res.Rooms.Invite[roomID] = *ir
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-05-17 09:25:59 +00:00
|
|
|
func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) {
|
|
|
|
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
|
|
|
// - Get membership list changes for this user in this sync response
|
|
|
|
// - For each room which has membership list changes:
|
|
|
|
// * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO).
|
|
|
|
// If it is, then we need to send the full room state down (and 'limited' is always true).
|
|
|
|
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
|
|
|
// * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
|
|
|
|
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
|
|
|
|
var deltas []stateDelta
|
|
|
|
|
|
|
|
// get all the state events ever between these two positions
|
|
|
|
state, err := d.events.StateBetween(txn, fromPos, toPos)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2017-05-17 15:21:27 +00:00
|
|
|
for roomID, stateStreamEvents := range state {
|
|
|
|
for _, ev := range stateStreamEvents {
|
2017-05-17 09:25:59 +00:00
|
|
|
// TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
|
|
|
|
// We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
|
|
|
|
// dupe join events will result in the entire room state coming down to the client again. This is added in
|
|
|
|
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
|
|
|
|
// the timeline.
|
2017-05-17 15:21:27 +00:00
|
|
|
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
|
2017-05-17 09:25:59 +00:00
|
|
|
if membership == "join" {
|
|
|
|
// send full room state down instead of a delta
|
|
|
|
var allState []gomatrixserverlib.Event
|
|
|
|
allState, err = d.roomstate.CurrentState(txn, roomID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2017-05-17 15:21:27 +00:00
|
|
|
s := make([]streamEvent, len(allState))
|
|
|
|
for i := 0; i < len(s); i++ {
|
|
|
|
s[i] = streamEvent{allState[i], types.StreamPosition(0)}
|
|
|
|
}
|
|
|
|
state[roomID] = s
|
2017-05-17 09:25:59 +00:00
|
|
|
continue // we'll add this room in when we do joined rooms
|
|
|
|
}
|
|
|
|
|
|
|
|
deltas = append(deltas, stateDelta{
|
2017-05-17 15:21:27 +00:00
|
|
|
membership: membership,
|
|
|
|
membershipPos: ev.streamPosition,
|
|
|
|
stateEvents: streamEventsToEvents(stateStreamEvents),
|
|
|
|
roomID: roomID,
|
2017-05-17 09:25:59 +00:00
|
|
|
})
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add in currently joined rooms
|
|
|
|
joinedRoomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
for _, joinedRoomID := range joinedRoomIDs {
|
|
|
|
deltas = append(deltas, stateDelta{
|
|
|
|
membership: "join",
|
2017-05-17 15:21:27 +00:00
|
|
|
stateEvents: streamEventsToEvents(state[joinedRoomID]),
|
2017-05-17 09:25:59 +00:00
|
|
|
roomID: joinedRoomID,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
return deltas, nil
|
|
|
|
}
|
|
|
|
|
2017-05-17 15:21:27 +00:00
|
|
|
func streamEventsToEvents(in []streamEvent) []gomatrixserverlib.Event {
|
|
|
|
out := make([]gomatrixserverlib.Event, len(in))
|
|
|
|
for i := 0; i < len(in); i++ {
|
|
|
|
out[i] = in[i].Event
|
|
|
|
}
|
|
|
|
return out
|
|
|
|
}
|
|
|
|
|
2017-05-11 14:51:35 +00:00
|
|
|
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
|
|
|
|
// them out so we don't include them twice in the /sync response. They should be in recentEvents
|
|
|
|
// only, so clients get to the correct state once they have rolled forward.
|
|
|
|
func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gomatrixserverlib.Event {
|
|
|
|
for _, recentEv := range recentEvents {
|
|
|
|
if recentEv.StateKey() == nil {
|
|
|
|
continue // not a state event
|
|
|
|
}
|
|
|
|
// TODO: This is a linear scan over all the current state events in this room. This will
|
|
|
|
// be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
|
|
|
|
// then do a binary search to find matching events, similar to what roomserver does.
|
|
|
|
for j := 0; j < len(stateEvents); j++ {
|
|
|
|
if stateEvents[j].EventID() == recentEv.EventID() {
|
|
|
|
// overwrite the element to remove with the last element then pop the last element.
|
|
|
|
// This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
|
|
|
|
// (we don't care about the order of stateEvents)
|
|
|
|
stateEvents[j] = stateEvents[len(stateEvents)-1]
|
|
|
|
stateEvents = stateEvents[:len(stateEvents)-1]
|
|
|
|
break // there shouldn't be multiple events with the same event ID
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return stateEvents
|
|
|
|
}
|
|
|
|
|
2017-05-17 09:25:59 +00:00
|
|
|
// getMembershipFromEvent returns the value of content.membership iff the event is a state event
|
|
|
|
// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
|
|
|
|
func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
|
|
|
|
if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
|
|
|
|
var memberContent events.MemberContent
|
|
|
|
if err := json.Unmarshal(ev.Content(), &memberContent); err != nil {
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
return memberContent.Membership
|
|
|
|
}
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
|
2017-04-05 09:30:13 +00:00
|
|
|
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
|
|
|
|
txn, err := db.Begin()
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
txn.Rollback()
|
|
|
|
panic(r)
|
|
|
|
} else if err != nil {
|
|
|
|
txn.Rollback()
|
|
|
|
} else {
|
|
|
|
err = txn.Commit()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
err = fn(txn)
|
|
|
|
return
|
|
|
|
}
|