Finishing Prototype

This commit is contained in:
Hoernschen 2020-10-17 12:07:39 +02:00
parent da9196f389
commit 473dc4a495
25 changed files with 1150 additions and 825 deletions

View file

@ -86,7 +86,7 @@ type Notifications struct {
Room int `json:"room,omitempty"`
}
type sendMessageRequest struct {
type SendMessageRequest struct {
MessageType string `json:"msgtype,omitempty"`
Body string `json:"body,omitempty"`
}

View file

@ -2,9 +2,12 @@ package event
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"log"
"math/rand"
"net/http"
"strconv"
"time"
@ -13,8 +16,10 @@ import (
"git.nutfactory.org/hoernschen/Matrix/entities/device"
"git.nutfactory.org/hoernschen/Matrix/entities/user"
"git.nutfactory.org/hoernschen/Matrix/utils"
"github.com/cenkalti/backoff/v4"
//"github.com/cenkalti/backoff/v4"
"github.com/gorilla/mux"
"github.com/lestrrat-go/backoff"
)
func New(
@ -85,7 +90,7 @@ func New(
func SendMessageHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
request := sendMessageRequest{}
request := SendMessageRequest{}
errResponse := utils.CheckRequest(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
@ -130,9 +135,7 @@ func SendMessageHandler(w http.ResponseWriter, r *http.Request) {
}
return
}
buf := new(bytes.Buffer)
buf.ReadFrom(r.Body)
content := buf.String()
content, _ := json.Marshal(request)
err, newEvent := New(
roomId,
foundUser.Id,
@ -140,7 +143,7 @@ func SendMessageHandler(w http.ResponseWriter, r *http.Request) {
time.Now().Unix(),
eventType,
foundUser.Id,
content,
string(content),
txnId,
)
if err != nil {
@ -173,13 +176,19 @@ func SendMessageHandler(w http.ResponseWriter, r *http.Request) {
return
}
for _, server := range servers {
operation := func() error {
return SendTransaction(transaction, server)
if server != config.Homeserver {
log.Printf("Send Transaction to %s", server)
/*
operation := func() error {
return SendTransaction(transaction, server, config.HttpString, config.AuthentificationCheck)
}
notify := func(err error, duration time.Duration) {
log.Printf("Error Sending Transaction, retrying in %ss: %s", duration/1000000000, err)
}
backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
*/
go retryTransaction(transaction, server, config.HttpString, config.AuthentificationCheck)
}
notify := func(err error, duration time.Duration) {
log.Printf("Error Sending Transaction, retrying in %ss: %s", duration/1000000000, err)
}
go backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
}
response := createEventResponse{
EventId: newEvent.Id,
@ -190,6 +199,20 @@ func SendMessageHandler(w http.ResponseWriter, r *http.Request) {
}
}
func retryTransaction(transactionToSend *Transaction, server string, httpString string, authCheck bool) (err error) {
b, cancel := config.BackoffPolicy.Start(context.Background())
defer cancel()
for backoff.Continue(b) {
err := SendTransaction(transactionToSend, server, httpString, authCheck)
if err == nil {
return nil
}
}
err = errors.New("Not able to send transaction")
return
}
func CreateStateEventHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
errResponse := utils.CheckRequest(r)
@ -271,13 +294,14 @@ func CreateStateEventHandler(w http.ResponseWriter, r *http.Request) {
return
}
for _, server := range servers {
operation := func() error {
return SendTransaction(transaction, server)
/*operation := func() error {
return SendTransaction(transaction, server, config.HttpString, config.AuthentificationCheck)
}
notify := func(err error, duration time.Duration) {
log.Printf("Error Sending Transaction, retrying in %ss: %s", duration/1000000000, err)
}
go backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
go backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)*/
go retryTransaction(transaction, server, config.HttpString, config.AuthentificationCheck)
}
response := createEventResponse{
@ -405,106 +429,98 @@ func GetStateEventHandler(w http.ResponseWriter, r *http.Request) {
}
func SyncEventsServerHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
request := syncEventsServerRequest{}
errResponse := utils.CheckRequest(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}
errResponse = utils.CheckAuthHeader(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&request)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: fmt.Sprintf("Could not parse JSON: %s", err)}); err != nil {
panic(err)
}
return
}
vars := mux.Vars(r)
txnId := vars["txnId"]
if txnId == "" {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: "Missing Parameter"}); err != nil {
panic(err)
}
return
}
response := syncEventsServerResponse{
PDUs: make(map[string]pduProcessingResult),
}
missingEventIds := make(map[string][]string)
for _, pdu := range request.PDUs {
signatureValid := CheckSignature(*pdu)
if !signatureValid {
log.Printf("Wrong Signature for Event %s", pdu.Id)
response.PDUs[pdu.Id] = pduProcessingResult{ProcessingError: "Signature not valid"}
continue
}
authEventsValid, err := CheckAuthEvents(pdu)
if !authEventsValid || err != nil {
log.Printf("Wrong Auth Events for Event %s", pdu.Id)
response.PDUs[pdu.Id] = pduProcessingResult{ProcessingError: fmt.Sprintf("Error in Auth Check: %s", err)}
//continue
}
missingParentIds, err := CheckParents(pdu)
if len(missingParentIds) > 0 || err != nil {
response.PDUs[pdu.Id] = pduProcessingResult{ProcessingError: fmt.Sprintf("Error in Parents Check: %s", err)}
for _, parentId := range missingParentIds {
missingEventIds[pdu.RoomId] = append(missingEventIds[pdu.RoomId], parentId)
packetLossNumber := rand.Intn(100)
if packetLossNumber > config.Packetloss {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
buf := new(bytes.Buffer)
buf.ReadFrom(r.Body)
request := syncEventsServerRequest{}
errResponse := utils.CheckRequest(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}
foundEvent, err := ReadEvent(pdu.Id)
if foundEvent == nil && err == nil {
err = CreateEvent(pdu, txnId)
}
_ = utils.CheckAuthHeader(r, buf.String())
/*if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}*/
err := json.Unmarshal(buf.Bytes(), &request)
if err != nil {
response.PDUs[pdu.Id] = pduProcessingResult{ProcessingError: fmt.Sprintf("Database Error: %s", err)}
continue
}
err = HandleEvent(pdu)
if err != nil {
response.PDUs[pdu.Id] = pduProcessingResult{ProcessingError: fmt.Sprintf("Error in Event-Handling: %s", err)}
continue
}
response.PDUs[pdu.Id] = pduProcessingResult{}
}
if len(missingEventIds) > 0 {
for roomId, eventIds := range missingEventIds {
operation := func() error {
return Backfill(eventIds, roomId, request.Origin)
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: fmt.Sprintf("SyncEventsServerHandler Could not parse JSON Request: %s", err)}); err != nil {
panic(err)
}
notify := func(err error, duration time.Duration) {
log.Printf("Error Backfill, retrying in %s: %s", duration/1000000000, err)
}
go backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
return
}
}
vars := mux.Vars(r)
txnId := vars["txnId"]
if txnId == "" {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: "SyncEventsServerHandler Missing Parameter"}); err != nil {
panic(err)
}
return
}
response := syncEventsServerResponse{
PDUs: make(map[string]pduProcessingResult),
}
missingEventIds := make(map[string][]string)
for _, pdu := range request.PDUs {
signatureValid := CheckSignature(*pdu)
if !signatureValid {
log.Printf("Wrong Signature for Event %s", pdu.Id)
response.PDUs[pdu.Id] = pduProcessingResult{ProcessingError: "Signature not valid"}
//continue
}
authEventsValid, err := CheckAuthEvents(pdu)
if !authEventsValid || err != nil {
log.Printf("Wrong Auth Events for Event %s", pdu.Id)
response.PDUs[pdu.Id] = pduProcessingResult{ProcessingError: fmt.Sprintf("Error in Auth Check: %s", err)}
//continue
}
missingParentIds, err := CheckParents(pdu)
if len(missingParentIds) > 0 || err != nil {
response.PDUs[pdu.Id] = pduProcessingResult{ProcessingError: fmt.Sprintf("Error in Parents Check: %s", err)}
for _, parentId := range missingParentIds {
missingEventIds[pdu.RoomId] = append(missingEventIds[pdu.RoomId], parentId)
}
}
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(response); err != nil {
panic(err)
err = HandleEvent(pdu, txnId)
if err != nil {
response.PDUs[pdu.Id] = pduProcessingResult{ProcessingError: fmt.Sprintf("Error in Event-Handling: %s", err)}
continue
}
response.PDUs[pdu.Id] = pduProcessingResult{}
}
if len(missingEventIds) > 0 {
for roomId, eventIds := range missingEventIds {
Backfill(eventIds, roomId, request.Origin, config.HttpString, config.AuthentificationCheck)
}
}
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(response); err != nil {
panic(err)
}
} else {
log.Println("Blocking Response")
}
}
func BackfillHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
errResponse := utils.CheckAuthHeader(r)
errResponse := utils.CheckAuthHeader(r, "")
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
@ -526,7 +542,6 @@ func BackfillHandler(w http.ResponseWriter, r *http.Request) {
limit := 50
eventIds, ok := r.URL.Query()["v"]
log.Printf("%s", eventIds)
if !ok || eventIds[0] == "" {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: "Missing Parameter"}); err != nil {
@ -544,11 +559,7 @@ func BackfillHandler(w http.ResponseWriter, r *http.Request) {
for _, eventId := range eventIds {
foundEvent, err := ReadEvent(eventId)
if err != nil || foundEvent == nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: "Event not found"}); err != nil {
panic(err)
}
return
continue
}
for newEventId, _ := range foundEvent.PrevEventHashes {
@ -569,8 +580,8 @@ func BackfillHandler(w http.ResponseWriter, r *http.Request) {
}
}
func SendTransaction(transaction *Transaction, homeserver string) (err error) {
requestUrl := fmt.Sprintf("https://%s/_matrix/federation/v1/send/%s?", homeserver, transaction.Id)
func SendTransaction(transaction *Transaction, homeserver string, httpString string, authentification bool) (err error) {
requestUrl := fmt.Sprintf("%s://%s/_matrix/federation/v1/send/%s?", httpString, homeserver, transaction.Id)
request := syncEventsServerRequest{
Origin: transaction.Origin,
Timestamp: transaction.Timestamp,
@ -580,24 +591,66 @@ func SendTransaction(transaction *Transaction, homeserver string) (err error) {
if err != nil {
return
}
client := &http.Client{}
client := &http.Client{Timeout: 2 * time.Second}
req, err := http.NewRequest(http.MethodPut, requestUrl, bytes.NewBuffer(reqBody))
if err != nil {
return
}
_, err = client.Do(req)
if authentification {
var authHeader string
authHeader, err = utils.CreateAuthHeader(http.MethodPut, requestUrl, homeserver, string(reqBody))
if err != nil {
return
}
req.Header["Authorization"] = []string{authHeader}
}
req.Header["Content-Type"] = []string{"application/json"}
r, err := client.Do(req)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
utils.HandleHTTPError(r)
} else {
buf := new(bytes.Buffer)
buf.ReadFrom(r.Body)
content := buf.String()
if content == "" {
err = errors.New("Missing Response")
return
}
}
return
}
func Backfill(eventIds []string, roomId string, homeserver string) (err error) {
requestUrl := fmt.Sprintf("https://%s/_matrix/federation/v1/backfill/%s?", homeserver, roomId)
func Backfill(eventIds []string, roomId string, homeserver string, httpString string, authentification bool) (err error) {
requestUrl := fmt.Sprintf("%s://%s/_matrix/federation/v1/backfill/%s?", httpString, homeserver, roomId)
for _, eventId := range eventIds {
requestUrl = fmt.Sprintf("%sv=%s&", requestUrl, eventId)
}
r, err := http.Get(requestUrl)
client := &http.Client{Timeout: 2 * time.Second}
var req *http.Request
req, err = http.NewRequest(http.MethodGet, requestUrl, bytes.NewBuffer(nil))
if err != nil {
return
}
if authentification {
var authHeader string
authHeader, err = utils.CreateAuthHeader(http.MethodGet, requestUrl, homeserver, "")
if err != nil {
return
}
req.Header["Authorization"] = []string{authHeader}
}
req.Header["Content-Type"] = []string{"application/json"}
r, err := client.Do(req)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errResponse := utils.HandleHTTPError(r)
log.Fatalf("%s (%s)", errResponse.ErrorMessage, errResponse.ErrorCode)
}
response := backfillResponse{}
defer r.Body.Close()
decoder := json.NewDecoder(r.Body)
@ -622,7 +675,6 @@ func Backfill(eventIds []string, roomId string, homeserver string) (err error) {
if !authEventsValid || err != nil {
log.Printf("Wrong Auth Events for Event %s", pdu.Id)
missingEventIds = append(missingEventIds, pdu.Id)
continue
}
missingParentIds, err := CheckParents(pdu)
if len(missingParentIds) > 0 || err != nil {
@ -630,14 +682,8 @@ func Backfill(eventIds []string, roomId string, homeserver string) (err error) {
missingEventIds = append(missingEventIds, parentId)
}
}
foundEvent, err := ReadEvent(pdu.Id)
if foundEvent == nil && err == nil {
err = CreateEvent(pdu, "")
}
if err != nil {
continue
}
err = HandleEvent(pdu)
err = HandleEvent(pdu, "")
if err != nil {
log.Printf("Error in Event-Handling: %s", err)
continue
@ -645,7 +691,7 @@ func Backfill(eventIds []string, roomId string, homeserver string) (err error) {
}
if len(missingEventIds) > 0 {
Backfill(missingEventIds, roomId, homeserver)
Backfill(missingEventIds, roomId, homeserver, config.HttpString, config.AuthentificationCheck)
}
return
@ -656,6 +702,9 @@ func generateEventId(id string) string {
}
func GetAuthChain(newEvent *Event) (authChain []*Event, err error) {
if !config.AuthentificationCheck {
return
}
createEvent, err := ReadStateEvent(newEvent.RoomId, "m.room.create", "")
if err != nil {
return
@ -698,7 +747,6 @@ func GetAuthChain(newEvent *Event) (authChain []*Event, err error) {
func GetAuthEvents(newEvent *Event) (authEventHashes map[string]EventHash, err error) {
authEventHashes = make(map[string]EventHash)
authChain, err := GetAuthChain(newEvent)
if err != nil {
return
@ -730,10 +778,12 @@ func CheckEventHash(id string, hash EventHash) (correct bool, eventFound bool, e
}
func CheckParents(eventToCheck *Event) (missingParentIds []string, err error) {
for key, hash := range eventToCheck.PrevEventHashes {
correctHash, foundEvent, err := CheckEventHash(key, hash)
if !correctHash || !foundEvent || err != nil {
missingParentIds = append(missingParentIds, key)
if config.Consensus {
for key, hash := range eventToCheck.PrevEventHashes {
correctHash, foundEvent, err := CheckEventHash(key, hash)
if !correctHash || !foundEvent || err != nil {
missingParentIds = append(missingParentIds, key)
}
}
}
return
@ -741,13 +791,16 @@ func CheckParents(eventToCheck *Event) (missingParentIds []string, err error) {
func CheckAuthEvents(eventToCheck *Event) (correct bool, err error) {
correct = true
if !config.AuthentificationCheck {
return
}
authEvents, err := GetAuthEvents(eventToCheck)
if err != nil {
return
}
for key, hash := range authEvents {
if eventToCheck.AuthEventHashes[key].SHA256 != hash.SHA256 {
correct = false
//correct = false
return
}
}
@ -755,6 +808,10 @@ func CheckAuthEvents(eventToCheck *Event) (correct bool, err error) {
}
func CheckSignature(eventToCheck Event) (correct bool) {
if !config.Signing {
correct = true
return
}
correct = false
signatures := eventToCheck.Signatures
eventToCheck.Unsigned = UnsignedData{}
@ -763,32 +820,33 @@ func CheckSignature(eventToCheck Event) (correct bool) {
if err != nil {
return
}
for id, signature := range signatures[eventToCheck.Sender] {
key, err := device.ReadKey(id)
if err == nil {
correct = utils.VerifySignature([]byte(key.Key), jsonString, []byte(signature))
for id, signature := range signatures[eventToCheck.Origin] {
verifyKey, err := device.GetVerifyKey(eventToCheck.Origin, id)
if err == nil && verifyKey != nil {
correct = utils.VerifySignature(verifyKey, jsonString, signature)
}
}
return
}
func HandleEvents(events []*Event) (err error) {
func HandleEvents(events []*Event, txnId string) (err error) {
for _, eventToHandle := range events {
err = HandleEvent(eventToHandle)
err = HandleEvent(eventToHandle, txnId)
}
return
}
func HandleEvent(eventToHandle *Event) (err error) {
func HandleEvent(eventToHandle *Event, txnId string) (err error) {
log.Printf("EventType: %s", eventToHandle.EventType)
foundEvent, err := ReadEvent(eventToHandle.Id)
if foundEvent == nil && err == nil {
err = CreateEvent(eventToHandle, txnId)
}
if err != nil {
return
}
if eventToHandle.EventType == "m.room.message" {
message := sendMessageRequest{}
err = json.Unmarshal([]byte(eventToHandle.Content), &message)
if err != nil {
return
}
if message.MessageType != "" && message.MessageType == "m.text" {
log.Printf("%s: %s", eventToHandle.Sender, message.Body)
}
log.Printf("%s: %s", eventToHandle.Sender, eventToHandle.Content)
} else if eventToHandle.EventType == "m.room.member" {
message := MemberEventContent{}
err = json.Unmarshal([]byte(eventToHandle.Content), &message)
@ -796,7 +854,11 @@ func HandleEvent(eventToHandle *Event) (err error) {
return
}
if message.Membership == "join" {
CreateRoomMember(eventToHandle.RoomId, eventToHandle.StateKey)
log.Printf("Join %s to Room %s", eventToHandle.StateKey, eventToHandle.RoomId)
}
err = CreateRoomMember(eventToHandle.RoomId, eventToHandle.StateKey, eventToHandle.Origin)
if err != nil {
return
}
}
return

View file

@ -2,12 +2,11 @@ package event
import (
"fmt"
"strings"
"git.nutfactory.org/hoernschen/Matrix/utils/database"
)
func CreateRoomMember(roomId string, userId string) (err error) {
func CreateRoomMember(roomId string, userId string, server string) (err error) {
sqlStmt := fmt.Sprintf(`INSERT INTO roomMember
(roomId, userId, server)
VALUES
@ -24,8 +23,9 @@ func CreateRoomMember(roomId string, userId string) (err error) {
}
defer stmt.Close()
_, err = stmt.Exec(roomId, userId, strings.Split(userId, ":")[1])
_, err = stmt.Exec(roomId, userId, server)
if err != nil {
tx.Rollback()
return
}
tx.Commit()
@ -55,6 +55,7 @@ func CreateParents(eventId string, parentIds map[string]EventHash) (err error) {
parentId,
)
if err != nil {
tx.Rollback()
return
}
}
@ -87,6 +88,7 @@ func CreateAuthEvents(eventId string, authEventIds map[string]EventHash) (err er
authEventId,
)
if err != nil {
tx.Rollback()
return
}
}
@ -133,6 +135,7 @@ func CreateEvent(event *Event, txnId string) (err error) {
signatures,
)
if err != nil {
tx.Rollback()
return
}
tx.Commit()
@ -188,6 +191,7 @@ func CreateEventsFromTransaction(txnId string, pdus map[string]*Event) (err erro
signatures,
)
if err != nil {
tx.Rollback()
return
}
@ -385,7 +389,7 @@ func ReadAuthEvents(eventId string) (authEvents map[string]EventHash, err error)
}
func ReadEvent(id string) (foundEvent *Event, err error) {
queryStmt := fmt.Sprintf(`SELECT id, roomId, txnId, sender, origin, timestamp, eventType, content, depth, hash, signature
queryStmt := fmt.Sprintf(`SELECT id, roomId, txnId, sender, origin, timestamp, eventType, stateKey, content, depth, hash, signature
FROM event
WHERE id = '%s'`, id)
@ -406,6 +410,7 @@ func ReadEvent(id string) (foundEvent *Event, err error) {
&foundEvent.Origin,
&foundEvent.Timestamp,
&foundEvent.EventType,
&foundEvent.StateKey,
&foundEvent.Content,
&foundEvent.Depth,
&foundEvent.Hashes.SHA256,
@ -435,14 +440,14 @@ func ReadEvent(id string) (foundEvent *Event, err error) {
}
func ReadStateEvent(roomId string, eventType string, stateKey string) (foundEvent *Event, err error) {
queryStmt := fmt.Sprintf(`SELECT id, roomId, txnId, sender, origin, timestamp, eventType, content, depth, hash, signature
queryStmt := fmt.Sprintf(`SELECT id, roomId, txnId, sender, origin, timestamp, eventType, stateKey, content, depth, hash, signature
FROM event
WHERE roomId = '%s'
AND eventType = '%s'
AND stateKey = '%s'`, roomId, eventType, stateKey)
if stateKey == "" {
queryStmt = fmt.Sprintf(`SELECT id, roomId, txnId, sender, origin, timestamp, eventType, content, depth, hash, signature
queryStmt = fmt.Sprintf(`SELECT id, roomId, txnId, sender, origin, timestamp, eventType, stateKey, content, depth, hash, signature
FROM event
WHERE roomId = '%s'
AND eventType = '%s'`, roomId, eventType)
@ -465,6 +470,7 @@ func ReadStateEvent(roomId string, eventType string, stateKey string) (foundEven
&foundEvent.Origin,
&foundEvent.Timestamp,
&foundEvent.EventType,
&foundEvent.StateKey,
&foundEvent.Content,
&foundEvent.Depth,
&foundEvent.Hashes.SHA256,
@ -494,7 +500,7 @@ func ReadStateEvent(roomId string, eventType string, stateKey string) (foundEven
}
func ReadStateEvents(roomId string, eventType string) (foundEvents []*Event, err error) {
queryStmt := fmt.Sprintf(`SELECT id, roomId, txnId, sender, origin, timestamp, eventType, content, depth, hash, signature
queryStmt := fmt.Sprintf(`SELECT id, roomId, txnId, sender, origin, timestamp, eventType, stateKey, content, depth, hash, signature
FROM event
WHERE roomId = '%s'
AND eventType = '%s'`, roomId, eventType)
@ -516,6 +522,7 @@ func ReadStateEvents(roomId string, eventType string) (foundEvents []*Event, err
&foundEvent.Origin,
&foundEvent.Timestamp,
&foundEvent.EventType,
&foundEvent.StateKey,
&foundEvent.Content,
&foundEvent.Depth,
&foundEvent.Hashes.SHA256,
@ -547,7 +554,7 @@ func ReadStateEvents(roomId string, eventType string) (foundEvents []*Event, err
}
func ReadEventsFromRoom(roomId string) (events map[string]*Event, err error) {
queryStmt := fmt.Sprintf(`SELECT id, roomId, txnId, sender, origin, timestamp, eventType, content, depth, hash, signature
queryStmt := fmt.Sprintf(`SELECT id, roomId, txnId, sender, origin, timestamp, eventType, stateKey, content, depth, hash, signature
FROM event
WHERE roomId = '%s'`, roomId)
@ -570,6 +577,7 @@ func ReadEventsFromRoom(roomId string) (events map[string]*Event, err error) {
&foundEvent.Origin,
&foundEvent.Timestamp,
&foundEvent.EventType,
&foundEvent.StateKey,
&foundEvent.Content,
&foundEvent.Depth,
&foundEvent.Hashes.SHA256,
@ -601,7 +609,7 @@ func ReadEventsFromRoom(roomId string) (events map[string]*Event, err error) {
}
func ReadStateEventsFromRoom(roomId string) (events []*Event, err error) {
queryStmt := fmt.Sprintf(`SELECT id, roomId, txnId, sender, origin, timestamp, eventType, content, depth, hash, signature
queryStmt := fmt.Sprintf(`SELECT id, roomId, txnId, sender, origin, timestamp, eventType, stateKey, content, depth, hash, signature
FROM event
WHERE eventType <> 'm.room.message' AND roomId = '%s'`, roomId)
@ -622,6 +630,7 @@ func ReadStateEventsFromRoom(roomId string) (events []*Event, err error) {
&foundEvent.Origin,
&foundEvent.Timestamp,
&foundEvent.EventType,
&foundEvent.StateKey,
&foundEvent.Content,
&foundEvent.Depth,
&foundEvent.Hashes.SHA256,
@ -653,7 +662,7 @@ func ReadStateEventsFromRoom(roomId string) (events []*Event, err error) {
}
func ReadEventsFromTransaction(txnId string) (events []*Event, err error) {
queryStmt := fmt.Sprintf(`SELECT id, roomId, txnId, sender, origin, timestamp, eventType, content, depth, hash, signature
queryStmt := fmt.Sprintf(`SELECT id, roomId, txnId, sender, origin, timestamp, eventType, stateKey, content, depth, hash, signature
FROM event
WHERE txnId = '%s'`, txnId)
@ -674,6 +683,7 @@ func ReadEventsFromTransaction(txnId string) (events []*Event, err error) {
&foundEvent.Origin,
&foundEvent.Timestamp,
&foundEvent.EventType,
&foundEvent.StateKey,
&foundEvent.Content,
&foundEvent.Depth,
&foundEvent.Hashes.SHA256,
@ -727,6 +737,7 @@ func UpdateEvent(event *Event) (err error) {
event.Id,
)
if err != nil {
tx.Rollback()
return
}
@ -743,8 +754,9 @@ func DeleteEvent(id string) (err error) {
return
}
_, err = database.DB.Exec(queryStmt)
_, err = tx.Exec(queryStmt)
if err != nil {
tx.Rollback()
return
}
@ -771,8 +783,9 @@ func DeleteParents(eventId string) (err error) {
return
}
_, err = database.DB.Exec(queryStmt)
_, err = tx.Exec(queryStmt)
if err != nil {
tx.Rollback()
return
}
@ -789,8 +802,9 @@ func DeleteAuthEvents(eventId string) (err error) {
return
}
_, err = database.DB.Exec(queryStmt)
_, err = tx.Exec(queryStmt)
if err != nil {
tx.Rollback()
return
}

View file

@ -25,6 +25,7 @@ func CreateTransaction(transaction *Transaction) (err error) {
_, err = stmt.Exec(transaction.Id, transaction.Origin, transaction.Timestamp)
if err != nil {
tx.Rollback()
return
}
tx.Commit()
@ -74,6 +75,7 @@ func UpdateTransaction(transaction *Transaction) (err error) {
_, err = stmt.Exec(transaction.Origin, transaction.Timestamp, transaction.Id)
if err != nil {
tx.Rollback()
return
}
@ -90,8 +92,9 @@ func DeleteTransaction(id string) (err error) {
return
}
_, err = database.DB.Exec(queryStmt)
_, err = tx.Exec(queryStmt)
if err != nil {
tx.Rollback()
return
}