Finished Prototype
This commit is contained in:
parent
ea54a27796
commit
6de476260d
30 changed files with 2189 additions and 0 deletions
5
entities/collection/collection.go
Normal file
5
entities/collection/collection.go
Normal file
|
@ -0,0 +1,5 @@
|
|||
package collection
|
||||
|
||||
type OutboxResponse struct {
|
||||
Location string `json:"Location,omitempty"`
|
||||
}
|
350
entities/collection/collectionController.go
Normal file
350
entities/collection/collectionController.go
Normal file
|
@ -0,0 +1,350 @@
|
|||
package collection
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.nutfactory.org/hoernschen/ActivityPub/config"
|
||||
"git.nutfactory.org/hoernschen/ActivityPub/entities/activity"
|
||||
"git.nutfactory.org/hoernschen/ActivityPub/entities/actor"
|
||||
"git.nutfactory.org/hoernschen/ActivityPub/entities/object"
|
||||
"git.nutfactory.org/hoernschen/ActivityPub/entities/user"
|
||||
"git.nutfactory.org/hoernschen/ActivityPub/utils"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func PostInboxHandler(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", utils.GetContentTypeString())
|
||||
newActivity := &activity.Activity{}
|
||||
err := utils.CheckRequest(r)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(err); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
token, err := utils.GetAccessToken(r)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(err); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if token == "" {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode("Missing Token"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
vars := mux.Vars(r)
|
||||
actorName := vars["actorName"]
|
||||
if actorName == "" {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode("Missing Actor"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
decoder := json.NewDecoder(r.Body)
|
||||
err = decoder.Decode(&newActivity)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(fmt.Sprintf("Could not parse JSON: %s", err)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if newActivity.Type == "Follow" {
|
||||
err = CreateCollectionObject(utils.GenerateFollowersUrl(actorName), newActivity.Actor)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(fmt.Sprintf("Database Error Creating Collection Object: %s", err)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
log.Printf("%s follows %s", newActivity.Actor, newActivity.To)
|
||||
} else if newActivity.Type == "Create" {
|
||||
foundObject, err := object.ReadObject(newActivity.Object.Id)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(fmt.Sprintf("Database Error Reading Object: %s", err)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if foundObject == nil {
|
||||
_ = object.CreateObject(newActivity.Object)
|
||||
/*
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(fmt.Sprintf("Database Error Creating Object: %s", err)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
*/
|
||||
}
|
||||
_ = CreateCollectionObject(utils.GenerateInboxUrl(actorName), newActivity.Object.Id)
|
||||
/*
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(fmt.Sprintf("Database Error Creating Collection Object: %s", err)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
*/
|
||||
log.Printf("%s to %s: %s", newActivity.Actor, newActivity.To, newActivity.Object.Content)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode("Unsupported Activity Type"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func PostOutboxHandler(w http.ResponseWriter, r *http.Request) {
|
||||
packetLossNumber := rand.Intn(100)
|
||||
if packetLossNumber > config.Packetloss {
|
||||
w.Header().Set("Content-Type", utils.GetContentTypeString())
|
||||
newActivity := &activity.Activity{}
|
||||
err := utils.CheckRequest(r)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(err); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
token, err := utils.GetAccessToken(r)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(err); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
foundUser, err := user.ReadUserFromToken(token)
|
||||
if err != nil || foundUser == nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(fmt.Sprintf("Unknown Token: %s", err)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
buf := new(bytes.Buffer)
|
||||
buf.ReadFrom(r.Body)
|
||||
err = json.Unmarshal(buf.Bytes(), newActivity)
|
||||
if err != nil || newActivity.Type == "Note" {
|
||||
postedObject := &object.Object{}
|
||||
err = json.Unmarshal(buf.Bytes(), postedObject)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(fmt.Sprintf("Could not parse JSON: %s")); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
var newObject *object.Object
|
||||
if postedObject.Id == "" {
|
||||
err, newObject = object.New(
|
||||
postedObject.Type,
|
||||
postedObject.AttributedTo,
|
||||
postedObject.Content,
|
||||
postedObject.Published,
|
||||
postedObject.To,
|
||||
foundUser.Id,
|
||||
)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(fmt.Sprintf("Error Creating Object: %s")); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
err = object.CreateObject(newObject)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(fmt.Sprintf("Database Error Creating Object: %s")); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
} else {
|
||||
newObject = postedObject
|
||||
}
|
||||
newActivity = activity.New(
|
||||
newObject.Id,
|
||||
newObject.AttributedTo,
|
||||
newObject,
|
||||
foundUser.Id,
|
||||
)
|
||||
}
|
||||
|
||||
var recipients []string
|
||||
if newActivity.Type == "Follow" {
|
||||
err = CreateCollectionObject(utils.GenerateFollowingUrl(foundUser.Id), newActivity.To)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(fmt.Sprintf("Database Error Creating Collection Object: %s")); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
recipients = append(recipients, newActivity.To)
|
||||
} else if newActivity.Type == "Create" {
|
||||
err = CreateCollectionObject(utils.GenerateOutboxUrl(foundUser.Id), newActivity.Id)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
if err := json.NewEncoder(w).Encode(fmt.Sprintf("Database Error Creating Collection Object: %s")); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
err, recipients = GetReciepientsForActivity(newActivity)
|
||||
}
|
||||
|
||||
for _, recipient := range recipients {
|
||||
log.Printf("Send Activity to Recipient %s", recipient)
|
||||
|
||||
operation := func() error {
|
||||
return SendActivity(newActivity, recipient, token)
|
||||
}
|
||||
notify := func(err error, duration time.Duration) {
|
||||
log.Printf("Error Sending Activity, retrying in %ss: %s", duration/1000000000, err)
|
||||
}
|
||||
backoff.RetryNotify(operation, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 16), notify)
|
||||
|
||||
//go retryActivity(newActivity, recipient, token)
|
||||
}
|
||||
|
||||
response := OutboxResponse{
|
||||
Location: newActivity.Id,
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
if err := json.NewEncoder(w).Encode(response); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
func retryActivity(activityToSend *activity.Activity, recipient string, token string) (err error) {
|
||||
b, cancel := config.BackoffPolicy.Start(context.Background())
|
||||
defer cancel()
|
||||
|
||||
for backoff.Continue(b) {
|
||||
err := SendActivity(activityToSend, recipient, token)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
err = errors.New("Not able to send activity")
|
||||
return
|
||||
}
|
||||
*/
|
||||
|
||||
func GetReciepientsForActivity(activityToSend *activity.Activity) (err error, recipientsWithoutDuplicates []string) {
|
||||
recipients := []string{}
|
||||
if strings.Contains(activityToSend.To, config.Homeserver) {
|
||||
var foundActor *actor.Actor
|
||||
foundActor, err = actor.ReadActor(activityToSend.To)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if foundActor == nil {
|
||||
var foundCollectionObjects []string
|
||||
foundCollectionObjects, err = ReadCollectionObjects(activityToSend.To)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if len(foundCollectionObjects) <= 0 {
|
||||
err = errors.New("No Recipients")
|
||||
return
|
||||
}
|
||||
recipients = append(recipients, foundCollectionObjects...)
|
||||
} else {
|
||||
recipients = append(recipients, foundActor.Id)
|
||||
}
|
||||
} else {
|
||||
// Not Implemented
|
||||
}
|
||||
recipientsWithoutDuplicates = utils.RemoveDuplicates(recipients)
|
||||
return
|
||||
}
|
||||
|
||||
func SendActivity(activityToSend *activity.Activity, recipient string, token string) (err error) {
|
||||
if strings.Contains(recipient, config.Homeserver) {
|
||||
if activityToSend.Type == "Follow" {
|
||||
followers := fmt.Sprintf("%sfollowers/", activityToSend.To)
|
||||
err = CreateCollectionObject(followers, activityToSend.Actor)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
log.Printf("%s follows %s", activityToSend.Actor, activityToSend.To)
|
||||
} else if activityToSend.Type == "Create" {
|
||||
id := activityToSend.Id
|
||||
if activityToSend.Object != nil {
|
||||
id = activityToSend.Object.Id
|
||||
}
|
||||
inbox := fmt.Sprintf("%sinbox", recipient)
|
||||
err = CreateCollectionObject(inbox, id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
log.Printf("%s to %s: %s", activityToSend.Actor, recipient, activityToSend.Object.Content)
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
activityToSend.To = recipient
|
||||
var profile *actor.Actor
|
||||
err, profile = actor.GetProfile(recipient)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var reqBody []byte
|
||||
reqBody, err = json.Marshal(activityToSend)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
client := &http.Client{Timeout: 2 * time.Second}
|
||||
var req *http.Request
|
||||
log.Printf("Inbox: %s", profile.Inbox)
|
||||
req, err = http.NewRequest(http.MethodPost, profile.Inbox, bytes.NewBuffer(reqBody))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
req.Header["Content-Type"] = []string{utils.GetContentTypeString()}
|
||||
req.Header["Authorization"] = []string{fmt.Sprintf("Bearer %s")}
|
||||
var res *http.Response
|
||||
res, err = client.Do(req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if res.StatusCode != http.StatusOK {
|
||||
err = utils.HandleHTTPError(res)
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
62
entities/collection/collectionDatabaseConnector.go
Normal file
62
entities/collection/collectionDatabaseConnector.go
Normal file
|
@ -0,0 +1,62 @@
|
|||
package collection
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"git.nutfactory.org/hoernschen/ActivityPub/utils/database"
|
||||
)
|
||||
|
||||
func CreateCollectionObject(collectionId string, objectId string) (err error) {
|
||||
sqlStmt := fmt.Sprintf(`INSERT INTO collectionObject
|
||||
(collectionId, objectId)
|
||||
VALUES
|
||||
(?, ?)`)
|
||||
|
||||
tx, err := database.DB.Begin()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(sqlStmt)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
_, err = stmt.Exec(
|
||||
collectionId,
|
||||
objectId,
|
||||
)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return
|
||||
}
|
||||
tx.Commit()
|
||||
return
|
||||
}
|
||||
|
||||
func ReadCollectionObjects(collectionId string) (collectionObjects []string, err error) {
|
||||
queryStmt := fmt.Sprintf(`SELECT objectId
|
||||
FROM collectionObject
|
||||
WHERE collectionId = '%s'`, collectionId)
|
||||
|
||||
rows, err := database.DB.Query(queryStmt)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var collectionObject string
|
||||
err = rows.Scan(
|
||||
&collectionObject,
|
||||
)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
collectionObjects = append(collectionObjects, collectionObject)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue