Matrix/entities/event/eventController.go

804 lines
21 KiB
Go
Raw Normal View History

2020-09-28 20:37:02 +00:00
package event
2020-10-11 21:11:30 +00:00
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"strconv"
"time"
2020-10-12 14:16:28 +00:00
"git.nutfactory.org/hoernschen/Matrix/config"
"git.nutfactory.org/hoernschen/Matrix/entities/device"
"git.nutfactory.org/hoernschen/Matrix/entities/user"
"git.nutfactory.org/hoernschen/Matrix/utils"
2020-10-11 21:11:30 +00:00
"github.com/cenkalti/backoff/v4"
"github.com/gorilla/mux"
)
func New(
roomId string,
sender string,
origin string,
timestamp int64,
eventType string,
stateKey string,
content string,
txnId string,
) (err error, newEvent *Event) {
err, eventId := utils.CreateUUID()
if err != nil {
return
}
id := generateEventId(eventId)
newEvent = &Event{
Id: id,
RoomId: roomId,
Sender: sender,
Origin: origin,
Timestamp: timestamp,
EventType: eventType,
StateKey: stateKey,
Content: content,
Unsigned: UnsignedData{
TransactionId: txnId,
},
}
newEvent.AuthEventHashes, err = GetAuthEvents(newEvent)
if err != nil {
return
}
if eventType != "m.room.create" {
var depth int
newEvent.PrevEventHashes, depth, err = ReadEventsWithoutChild(roomId)
if err != nil {
return
}
newEvent.Depth = depth + 1
}
newEvent.AuthEventHashes, err = GetAuthEvents(newEvent)
if err != nil {
return
}
newEventBytesForHash, err := json.Marshal(newEvent)
if err != nil {
return
}
err, newEvent.Hashes.SHA256 = utils.Hash(newEventBytesForHash)
if err != nil {
return
}
newEvent.Unsigned = UnsignedData{}
newEventBytesForSign, err := json.Marshal(newEvent)
if err != nil {
return
}
newEvent.Signatures = utils.SignContent(newEventBytesForSign)
newEvent.Unsigned = UnsignedData{
TransactionId: txnId,
}
return
}
func SendMessageHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
request := sendMessageRequest{}
errResponse := utils.CheckRequest(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}
token, errResponse := utils.GetAccessToken(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}
foundUser, err := user.ReadUserFromAccessToken(token)
if err != nil || foundUser == nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorCode: "M_UNKNOWN_TOKEN", ErrorMessage: fmt.Sprintf("%s", err)}); err != nil {
panic(err)
}
return
}
decoder := json.NewDecoder(r.Body)
err = decoder.Decode(&request)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: fmt.Sprintf("Could not parse JSON: %s", err)}); err != nil {
panic(err)
}
return
}
vars := mux.Vars(r)
roomId := vars["roomId"]
eventType := vars["eventType"]
txnId := vars["txnId"]
if roomId == "" || eventType == "" || txnId == "" {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: "Missing Parameter"}); err != nil {
panic(err)
}
return
}
buf := new(bytes.Buffer)
buf.ReadFrom(r.Body)
content := buf.String()
err, newEvent := New(
roomId,
foundUser.Id,
config.Homeserver,
time.Now().Unix(),
eventType,
foundUser.Id,
content,
txnId,
)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: fmt.Sprintf("Error Creating Event: %s", err)}); err != nil {
panic(err)
}
return
}
err = CreateEvent(newEvent, txnId)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: fmt.Sprintf("Database Error: %s", err)}); err != nil {
panic(err)
}
return
}
transaction := &Transaction{
Id: txnId,
Origin: config.Homeserver,
Timestamp: time.Now().Unix(),
PDUS: []*Event{newEvent},
}
servers, err := ReadServers(roomId)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: fmt.Sprintf("Database Error: %s", err)}); err != nil {
panic(err)
}
return
}
for _, server := range servers {
operation := func() error {
return SendTransaction(transaction, server)
}
notify := func(err error, duration time.Duration) {
log.Printf("Error Sending Transaction, retrying in %ss: %s", duration/1000000000, err)
}
go backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
}
response := createEventResponse{
EventId: newEvent.Id,
}
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(response); err != nil {
panic(err)
}
}
func CreateStateEventHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
errResponse := utils.CheckRequest(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}
token, errResponse := utils.GetAccessToken(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}
foundUser, err := user.ReadUserFromAccessToken(token)
if err != nil || foundUser == nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorCode: "M_UNKNOWN_TOKEN", ErrorMessage: fmt.Sprintf("%s", err)}); err != nil {
panic(err)
}
return
}
vars := mux.Vars(r)
roomId := vars["roomId"]
eventType := vars["eventType"]
stateKey := vars["stateKey"]
if roomId == "" || eventType == "" {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: "Missing Parameter"}); err != nil {
panic(err)
}
return
}
buf := new(bytes.Buffer)
buf.ReadFrom(r.Body)
content := buf.String()
err, newEvent := New(
roomId,
foundUser.Id,
config.Homeserver,
time.Now().Unix(),
eventType,
stateKey,
content,
"",
)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: fmt.Sprintf("Error Creating Event: %s", err)}); err != nil {
panic(err)
}
return
}
err = CreateEvent(newEvent, "")
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: fmt.Sprintf("Database Error: %s", err)}); err != nil {
panic(err)
}
return
}
err, txnId := utils.CreateUUID()
transaction := &Transaction{
Id: txnId,
Origin: config.Homeserver,
Timestamp: time.Now().Unix(),
PDUS: []*Event{newEvent},
}
servers, err := ReadServers(roomId)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: fmt.Sprintf("Database Error: %s", err)}); err != nil {
panic(err)
}
return
}
for _, server := range servers {
operation := func() error {
return SendTransaction(transaction, server)
}
notify := func(err error, duration time.Duration) {
log.Printf("Error Sending Transaction, retrying in %ss: %s", duration/1000000000, err)
}
go backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
}
response := createEventResponse{
EventId: newEvent.Id,
}
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(response); err != nil {
panic(err)
}
}
func GetEventUserHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
request := getEventRequest{}
errResponse := utils.CheckRequest(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}
token, errResponse := utils.GetAccessToken(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}
foundUser, err := user.ReadUserFromAccessToken(token)
if err != nil || foundUser == nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorCode: "M_UNKNOWN_TOKEN", ErrorMessage: fmt.Sprintf("%s", err)}); err != nil {
panic(err)
}
return
}
decoder := json.NewDecoder(r.Body)
err = decoder.Decode(&request)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: fmt.Sprintf("Could not parse JSON: %s", err)}); err != nil {
panic(err)
}
return
}
vars := mux.Vars(r)
roomId := vars["roomId"]
eventId := vars["eventId"]
if roomId == "" || eventId == "" {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: "Missing Parameter"}); err != nil {
panic(err)
}
return
}
foundEvent, err := ReadEvent(eventId)
if err != nil || foundEvent == nil {
w.WriteHeader(http.StatusNotFound)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorCode: "M_NOT_FOUND", ErrorMessage: fmt.Sprintf("Event not found. %s", err)}); err != nil {
panic(err)
}
return
}
response := getEventResponse{
Content: foundEvent.Content,
EventType: foundEvent.EventType,
}
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(response); err != nil {
panic(err)
}
}
func GetStateEventHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
errResponse := utils.CheckRequest(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}
token, errResponse := utils.GetAccessToken(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}
foundUser, err := user.ReadUserFromAccessToken(token)
if err != nil || foundUser == nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorCode: "M_UNKNOWN_TOKEN", ErrorMessage: fmt.Sprintf("%s", err)}); err != nil {
panic(err)
}
return
}
vars := mux.Vars(r)
roomId := vars["roomId"]
eventType := vars["eventType"]
stateKey := vars["stateKey"]
if roomId == "" || eventType == "" {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: "Missing Parameter"}); err != nil {
panic(err)
}
return
}
foundEvent, err := ReadStateEvent(roomId, eventType, stateKey)
if err != nil || foundEvent == nil {
w.WriteHeader(http.StatusNotFound)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorCode: "M_NOT_FOUND", ErrorMessage: fmt.Sprintf("Event not found. %s", err)}); err != nil {
panic(err)
}
return
}
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(foundEvent.Content); err != nil {
panic(err)
}
}
func SyncEventsServerHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
request := syncEventsServerRequest{}
errResponse := utils.CheckRequest(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}
errResponse = utils.CheckAuthHeader(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&request)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: fmt.Sprintf("Could not parse JSON: %s", err)}); err != nil {
panic(err)
}
return
}
vars := mux.Vars(r)
txnId := vars["txnId"]
if txnId == "" {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: "Missing Parameter"}); err != nil {
panic(err)
}
return
}
response := syncEventsServerResponse{
PDUs: make(map[string]pduProcessingResult),
}
missingEventIds := make(map[string][]string)
for _, pdu := range request.PDUs {
signatureValid := CheckSignature(*pdu)
if !signatureValid {
log.Printf("Wrong Signature for Event %s", pdu.Id)
response.PDUs[pdu.Id] = pduProcessingResult{ProcessingError: "Signature not valid"}
continue
}
authEventsValid, err := CheckAuthEvents(pdu)
if !authEventsValid || err != nil {
log.Printf("Wrong Auth Events for Event %s", pdu.Id)
response.PDUs[pdu.Id] = pduProcessingResult{ProcessingError: fmt.Sprintf("Error in Auth Check: %s", err)}
//continue
}
missingParentIds, err := CheckParents(pdu)
if len(missingParentIds) > 0 || err != nil {
response.PDUs[pdu.Id] = pduProcessingResult{ProcessingError: fmt.Sprintf("Error in Parents Check: %s", err)}
for _, parentId := range missingParentIds {
missingEventIds[pdu.RoomId] = append(missingEventIds[pdu.RoomId], parentId)
}
}
foundEvent, err := ReadEvent(pdu.Id)
if foundEvent == nil && err == nil {
err = CreateEvent(pdu, txnId)
}
if err != nil {
response.PDUs[pdu.Id] = pduProcessingResult{ProcessingError: fmt.Sprintf("Database Error: %s", err)}
continue
}
err = HandleEvent(pdu)
if err != nil {
response.PDUs[pdu.Id] = pduProcessingResult{ProcessingError: fmt.Sprintf("Error in Event-Handling: %s", err)}
continue
}
response.PDUs[pdu.Id] = pduProcessingResult{}
}
if len(missingEventIds) > 0 {
for roomId, eventIds := range missingEventIds {
operation := func() error {
return Backfill(eventIds, roomId, request.Origin)
}
notify := func(err error, duration time.Duration) {
log.Printf("Error Backfill, retrying in %s: %s", duration/1000000000, err)
}
go backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
}
}
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(response); err != nil {
panic(err)
}
}
func BackfillHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
errResponse := utils.CheckAuthHeader(r)
if errResponse != nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(errResponse); err != nil {
panic(err)
}
return
}
vars := mux.Vars(r)
roomId := vars["roomId"]
if roomId == "" {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: "Missing Parameter"}); err != nil {
panic(err)
}
return
}
limit := 50
eventIds, ok := r.URL.Query()["v"]
log.Printf("%s", eventIds)
if !ok || eventIds[0] == "" {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: "Missing Parameter"}); err != nil {
panic(err)
}
return
}
limitString := r.URL.Query().Get("limit")
if limitString != "" {
limit, _ = strconv.Atoi(limitString)
}
pdus := []*Event{}
for len(pdus) < limit {
newEventIds := []string{}
for _, eventId := range eventIds {
foundEvent, err := ReadEvent(eventId)
if err != nil || foundEvent == nil {
w.WriteHeader(http.StatusBadRequest)
if err := json.NewEncoder(w).Encode(utils.ErrorResponse{ErrorMessage: "Event not found"}); err != nil {
panic(err)
}
return
}
for newEventId, _ := range foundEvent.PrevEventHashes {
newEventIds = append(newEventIds, newEventId)
}
pdus = append(pdus, foundEvent)
}
eventIds = newEventIds
}
response := backfillResponse{
Origin: config.Homeserver,
Timestamp: time.Now().Unix(),
PDUs: pdus,
}
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(response); err != nil {
panic(err)
}
}
func SendTransaction(transaction *Transaction, homeserver string) (err error) {
requestUrl := fmt.Sprintf("https://%s/_matrix/federation/v1/send/%s?", homeserver, transaction.Id)
request := syncEventsServerRequest{
Origin: transaction.Origin,
Timestamp: transaction.Timestamp,
PDUs: transaction.PDUS,
}
reqBody, err := json.Marshal(request)
if err != nil {
return
}
client := &http.Client{}
req, err := http.NewRequest(http.MethodPut, requestUrl, bytes.NewBuffer(reqBody))
if err != nil {
return
}
_, err = client.Do(req)
return
}
func Backfill(eventIds []string, roomId string, homeserver string) (err error) {
requestUrl := fmt.Sprintf("https://%s/_matrix/federation/v1/backfill/%s?", homeserver, roomId)
for _, eventId := range eventIds {
requestUrl = fmt.Sprintf("%sv=%s&", requestUrl, eventId)
}
r, err := http.Get(requestUrl)
if err != nil {
return
}
response := backfillResponse{}
defer r.Body.Close()
decoder := json.NewDecoder(r.Body)
err = decoder.Decode(&response)
if err != nil {
return
}
var missingEventIds []string
for _, pdu := range response.PDUs {
for i, eventId := range missingEventIds {
if pdu.Id == eventId {
missingEventIds = append(missingEventIds[:i], missingEventIds[i+1:]...)
}
}
signatureValid := CheckSignature(*pdu)
if !signatureValid {
log.Printf("Wrong Signature for Event %s", pdu.Id)
missingEventIds = append(missingEventIds, pdu.Id)
continue
}
authEventsValid, err := CheckAuthEvents(pdu)
if !authEventsValid || err != nil {
log.Printf("Wrong Auth Events for Event %s", pdu.Id)
missingEventIds = append(missingEventIds, pdu.Id)
continue
}
missingParentIds, err := CheckParents(pdu)
if len(missingParentIds) > 0 || err != nil {
for _, parentId := range missingParentIds {
missingEventIds = append(missingEventIds, parentId)
}
}
foundEvent, err := ReadEvent(pdu.Id)
if foundEvent == nil && err == nil {
err = CreateEvent(pdu, "")
}
if err != nil {
continue
}
err = HandleEvent(pdu)
if err != nil {
log.Printf("Error in Event-Handling: %s", err)
continue
}
}
if len(missingEventIds) > 0 {
Backfill(missingEventIds, roomId, homeserver)
}
return
}
func generateEventId(id string) string {
return fmt.Sprintf("$%s:%s", id, config.Homeserver)
}
func GetAuthChain(newEvent *Event) (authChain []*Event, err error) {
createEvent, err := ReadStateEvent(newEvent.RoomId, "m.room.create", "")
if err != nil {
return
}
if createEvent != nil {
authChain = append(authChain, createEvent)
}
powerLevelEvent, err := ReadStateEvent(newEvent.RoomId, "m.room.power_levels", "")
if err != nil {
return
}
if powerLevelEvent != nil {
authChain = append(authChain, powerLevelEvent)
}
stateKey := newEvent.Sender
if newEvent.EventType == "m.room.member" {
stateKey = newEvent.StateKey
}
memberEvent, err := ReadStateEvent(newEvent.RoomId, "m.room.member", stateKey)
if err != nil {
return
}
if memberEvent != nil {
authChain = append(authChain, memberEvent)
}
joinRuleEvent, err := ReadStateEvent(newEvent.RoomId, "m.room.join_rules", "")
if err != nil {
return
}
if joinRuleEvent != nil && newEvent.EventType == "m.room.member" {
authChain = append(authChain, joinRuleEvent)
}
return
}
func GetAuthEvents(newEvent *Event) (authEventHashes map[string]EventHash, err error) {
authEventHashes = make(map[string]EventHash)
authChain, err := GetAuthChain(newEvent)
if err != nil {
return
}
for _, authEvent := range authChain {
authEventHashes[authEvent.Id] = authEvent.Hashes
}
return
}
func CheckEventHash(id string, hash EventHash) (correct bool, eventFound bool, err error) {
foundEvent, err := ReadEvent(id)
correct = true
eventFound = true
if err != nil {
return
}
if foundEvent == nil {
eventFound = false
return
}
if hash.SHA256 != foundEvent.Hashes.SHA256 {
correct = false
return
}
return
}
func CheckParents(eventToCheck *Event) (missingParentIds []string, err error) {
for key, hash := range eventToCheck.PrevEventHashes {
correctHash, foundEvent, err := CheckEventHash(key, hash)
if !correctHash || !foundEvent || err != nil {
missingParentIds = append(missingParentIds, key)
}
}
return
}
func CheckAuthEvents(eventToCheck *Event) (correct bool, err error) {
correct = true
authEvents, err := GetAuthEvents(eventToCheck)
if err != nil {
return
}
for key, hash := range authEvents {
if eventToCheck.AuthEventHashes[key].SHA256 != hash.SHA256 {
correct = false
return
}
}
return
}
func CheckSignature(eventToCheck Event) (correct bool) {
correct = false
signatures := eventToCheck.Signatures
eventToCheck.Unsigned = UnsignedData{}
eventToCheck.Signatures = nil
jsonString, err := json.Marshal(eventToCheck)
if err != nil {
return
}
for id, signature := range signatures[eventToCheck.Sender] {
key, err := device.ReadKey(id)
if err == nil {
correct = utils.VerifySignature([]byte(key.Key), jsonString, []byte(signature))
}
}
return
}
func HandleEvents(events []*Event) (err error) {
for _, eventToHandle := range events {
err = HandleEvent(eventToHandle)
}
return
}
func HandleEvent(eventToHandle *Event) (err error) {
if eventToHandle.EventType == "m.room.message" {
message := sendMessageRequest{}
err = json.Unmarshal([]byte(eventToHandle.Content), &message)
if err != nil {
return
}
if message.MessageType != "" && message.MessageType == "m.text" {
log.Printf("%s: %s", eventToHandle.Sender, message.Body)
}
} else if eventToHandle.EventType == "m.room.member" {
message := MemberEventContent{}
err = json.Unmarshal([]byte(eventToHandle.Content), &message)
if err != nil {
return
}
if message.Membership == "join" {
CreateRoomMember(eventToHandle.RoomId, eventToHandle.StateKey)
}
}
2020-10-01 15:45:57 +00:00
return
}