diff --git a/.travis.yml b/.travis.yml index c9ad822c..0d6a22b7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,26 @@ language: go go: - 1.7 + +sudo: false + +# Use trusty for postgres 9.5 support +dist: trusty + +addons: + postgresql: "9.5" + +services: + - postgresql + install: - go get github.com/constabulary/gb/... - go get github.com/golang/lint/golint - go get github.com/fzipp/gocyclo + - ./travis-install-kafka.sh script: - - gb build github.com/matrix-org/dendrite/roomserver/roomserver && ./hooks/pre-commit + - ./travis-test.sh notifications: webhooks: diff --git a/src/github.com/matrix-org/dendrite/roomserver/roomserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/roomserver/roomserver-integration-tests/main.go new file mode 100644 index 00000000..06f22a2a --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/roomserver-integration-tests/main.go @@ -0,0 +1,340 @@ +package main + +import ( + "fmt" + "github.com/matrix-org/gomatrixserverlib" + "os" + "os/exec" + "path/filepath" + "strings" + "time" +) + +var ( + // Path to where kafka is installed. + kafkaDir = defaulting(os.Getenv("KAFKA_DIR"), "kafka") + // The URI the kafka zookeeper is listening on. + zookeeperURI = defaulting(os.Getenv("ZOOKEEPER_URI"), "localhost:2181") + // The URI the kafka server is listening on. + kafkaURI = defaulting(os.Getenv("KAFKA_URIS"), "localhost:9092") + // How long to wait for the roomserver to write the expected output messages. + timeoutString = defaulting(os.Getenv("TIMEOUT"), "10s") + // The name of maintenance database to connect to in order to create the test database. + postgresDatabase = defaulting(os.Getenv("POSTGRES_DATABASE"), "postgres") + // The name of the test database to create. + testDatabaseName = defaulting(os.Getenv("DATABASE_NAME"), "roomserver_test") + // The postgres connection config for connecting to the test database. + testDatabase = defaulting(os.Getenv("DATABASE"), fmt.Sprintf("dbname=%s binary_parameters=yes", testDatabaseName)) +) + +func defaulting(value, defaultValue string) string { + if value == "" { + value = defaultValue + } + return value +} + +var timeout time.Duration + +func init() { + var err error + timeout, err = time.ParseDuration(timeoutString) + if err != nil { + panic(err) + } +} + +func createDatabase(database string) error { + cmd := exec.Command("psql", postgresDatabase) + cmd.Stdin = strings.NewReader( + fmt.Sprintf("DROP DATABASE IF EXISTS %s; CREATE DATABASE %s;", database, database), + ) + // Send stdout and stderr to our stderr so that we see error messages from + // the psql process + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + return cmd.Run() +} + +func createTopic(topic string) error { + cmd := exec.Command( + filepath.Join(kafkaDir, "bin", "kafka-topics.sh"), + "--create", + "--zookeeper", zookeeperURI, + "--replication-factor", "1", + "--partitions", "1", + "--topic", topic, + ) + // Send stdout and stderr to our stderr so that we see error messages from + // the kafka process. + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + return cmd.Run() +} + +func writeToTopic(topic string, data []string) error { + cmd := exec.Command( + filepath.Join(kafkaDir, "bin", "kafka-console-producer.sh"), + "--broker-list", kafkaURI, + "--topic", topic, + ) + // Send stdout and stderr to our stderr so that we see error messages from + // the kafka process. + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + cmd.Stdin = strings.NewReader(strings.Join(data, "\n")) + return cmd.Run() +} + +// runAndReadFromTopic runs a command and waits for a number of messages to be +// written to a kafka topic. It returns if the command exits, the number of +// messages is reached or after a timeout. It kills the command before it returns. +// It returns a list of the messages read from the command on success or an error +// on failure. +func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int) ([]string, error) { + type result struct { + // data holds all of stdout on success. + data []byte + // err is set on failure. + err error + } + done := make(chan result) + readCmd := exec.Command( + filepath.Join(kafkaDir, "bin", "kafka-console-consumer.sh"), + "--bootstrap-server", kafkaURI, + "--topic", topic, + "--from-beginning", + "--max-messages", fmt.Sprintf("%d", count), + ) + // Send stderr to our stderr so the user can see any error messages. + readCmd.Stderr = os.Stderr + // Run the command, read the messages and wait for a timeout in parallel. + go func() { + // Read all of stdout. + data, err := readCmd.Output() + done <- result{data, err} + }() + go func() { + err := runCmd.Run() + done <- result{nil, err} + }() + go func() { + time.Sleep(timeout) + done <- result{nil, fmt.Errorf("Timeout reading %d messages from topic %q", count, topic)} + }() + // Wait for one of the tasks to finsh. + r := <-done + + // Kill both processes. We don't check if the processes are running and + // we ignore failures since we are just trying to clean up before returning. + runCmd.Process.Kill() + readCmd.Process.Kill() + + if r.err != nil { + return nil, r.err + } + + // The kafka console consumer writes a newline character after each message. + // So we split on newline characters + lines := strings.Split(string(r.data), "\n") + if len(lines) > 0 { + // Remove the blank line at the end of the data. + lines = lines[:len(lines)-1] + } + return lines, nil +} + +func deleteTopic(topic string) error { + cmd := exec.Command( + filepath.Join(kafkaDir, "bin", "kafka-topics.sh"), + "--delete", + "--if-exists", + "--zookeeper", zookeeperURI, + "--topic", topic, + ) + cmd.Stderr = os.Stderr + cmd.Stdout = os.Stderr + return cmd.Run() +} + +func testRoomServer(input []string, wantOutput []string) { + const ( + inputTopic = "roomserverInput" + outputTopic = "roomserverOutput" + ) + deleteTopic(inputTopic) + if err := createTopic(inputTopic); err != nil { + panic(err) + } + deleteTopic(outputTopic) + if err := createTopic(outputTopic); err != nil { + panic(err) + } + + if err := writeToTopic(inputTopic, canonicalJSONInput(input)); err != nil { + panic(err) + } + + if err := createDatabase(testDatabaseName); err != nil { + panic(err) + } + + cmd := exec.Command(filepath.Join(filepath.Dir(os.Args[0]), "roomserver")) + + // Append the roomserver config to the existing environment. + // We append to the environment rather than replacing so that any additional + // postgres and golang environment variables such as PGHOST are passed to + // the roomserver process. + cmd.Env = append( + os.Environ(), + fmt.Sprintf("DATABASE=%s", testDatabase), + fmt.Sprintf("KAFKA_URIS=%s", kafkaURI), + fmt.Sprintf("TOPIC_INPUT_ROOM_EVENT=%s", inputTopic), + fmt.Sprintf("TOPIC_OUTPUT_ROOM_EVENT=%s", outputTopic), + ) + cmd.Stderr = os.Stderr + + gotOutput, err := runAndReadFromTopic(cmd, outputTopic, 1) + if err != nil { + panic(err) + } + + if len(wantOutput) != len(gotOutput) { + panic(fmt.Errorf("Wanted %d lines of output got %d lines", len(wantOutput), len(gotOutput))) + } + + for i := range wantOutput { + if !equalJSON(wantOutput[i], gotOutput[i]) { + panic(fmt.Errorf("Wanted %q at index %d got %q", wantOutput[i], i, gotOutput[i])) + } + } +} + +func canonicalJSONInput(jsonData []string) []string { + for i := range jsonData { + jsonBytes, err := gomatrixserverlib.CanonicalJSON([]byte(jsonData[i])) + if err != nil { + panic(err) + } + jsonData[i] = string(jsonBytes) + } + return jsonData +} + +func equalJSON(a, b string) bool { + canonicalA, err := gomatrixserverlib.CanonicalJSON([]byte(a)) + if err != nil { + panic(err) + } + canonicalB, err := gomatrixserverlib.CanonicalJSON([]byte(b)) + if err != nil { + panic(err) + } + return string(canonicalA) == string(canonicalB) +} + +func main() { + fmt.Println("==TESTING==", os.Args[0]) + + input := []string{ + `{ + "AuthEventIDs": [], + "Kind": 1, + "Event": { + "origin": "matrix.org", + "signatures": { + "matrix.org": { + "ed25519:auto": "3kXGwNtdj+zqEXlI8PWLiB76xtrQ7SxcvPuXAEVCTo+QPoBoUvLi1RkHs6O5mDz7UzIowK5bi1seAN4vOh0OBA" + } + }, + "origin_server_ts": 1463671337837, + "sender": "@richvdh:matrix.org", + "event_id": "$1463671337126266wrSBX:matrix.org", + "prev_events": [], + "state_key": "", + "content": {"creator": "@richvdh:matrix.org"}, + "depth": 1, + "prev_state": [], + "room_id": "!HCXfdvrfksxuYnIFiJ:matrix.org", + "auth_events": [], + "hashes": {"sha256": "Q05VLC8nztN2tguy+KnHxxhitI95wK9NelnsDaXRqeo"}, + "type": "m.room.create"} + }`, `{ + "AuthEventIDs": ["$1463671337126266wrSBX:matrix.org"], + "Kind": 2, + "StateEventIDs": ["$1463671337126266wrSBX:matrix.org"], + "Event": { + "origin": "matrix.org", + "signatures": { + "matrix.org": { + "ed25519:auto": "a2b3xXYVPPFeG1sHCU3hmZnAaKqZFgzGZozijRGblG5Y//ewRPAn1A2mCrI2UM5I+0zqr70cNpHgF8bmNFu4BA" + } + }, + "origin_server_ts": 1463671339844, + "sender": "@richvdh:matrix.org", + "event_id": "$1463671339126270PnVwC:matrix.org", + "prev_events": [[ + "$1463671337126266wrSBX:matrix.org", {"sha256": "h/VS07u8KlMwT3Ee8JhpkC7sa1WUs0Srgs+l3iBv6c0"} + ]], + "membership": "join", + "state_key": "@richvdh:matrix.org", + "content": { + "membership": "join", + "avatar_url": "mxc://matrix.org/ZafPzsxMJtLaSaJXloBEKiws", + "displayname": "richvdh" + }, + "depth": 2, + "prev_state": [], + "room_id": "!HCXfdvrfksxuYnIFiJ:matrix.org", + "auth_events": [[ + "$1463671337126266wrSBX:matrix.org", {"sha256": "h/VS07u8KlMwT3Ee8JhpkC7sa1WUs0Srgs+l3iBv6c0"} + ]], + "hashes": {"sha256": "t9t3sZV1Eu0P9Jyrs7pge6UTa1zuTbRdVxeUHnrQVH0"}, + "type": "m.room.member"}, + "HasState": true + }`, + } + + want := []string{ + `{ + "Event":{ + "auth_events":[[ + "$1463671337126266wrSBX:matrix.org",{"sha256":"h/VS07u8KlMwT3Ee8JhpkC7sa1WUs0Srgs+l3iBv6c0"} + ]], + "content":{ + "avatar_url":"mxc://matrix.org/ZafPzsxMJtLaSaJXloBEKiws", + "displayname":"richvdh", + "membership":"join" + }, + "depth": 2, + "event_id": "$1463671339126270PnVwC:matrix.org", + "hashes": {"sha256":"t9t3sZV1Eu0P9Jyrs7pge6UTa1zuTbRdVxeUHnrQVH0"}, + "membership": "join", + "origin": "matrix.org", + "origin_server_ts": 1463671339844, + "prev_events": [[ + "$1463671337126266wrSBX:matrix.org",{"sha256":"h/VS07u8KlMwT3Ee8JhpkC7sa1WUs0Srgs+l3iBv6c0"} + ]], + "prev_state":[], + "room_id":"!HCXfdvrfksxuYnIFiJ:matrix.org", + "sender":"@richvdh:matrix.org", + "signatures":{ + "matrix.org":{ + "ed25519:auto":"a2b3xXYVPPFeG1sHCU3hmZnAaKqZFgzGZozijRGblG5Y//ewRPAn1A2mCrI2UM5I+0zqr70cNpHgF8bmNFu4BA" + } + }, + "state_key":"@richvdh:matrix.org", + "type":"m.room.member" + }, + "VisibilityEventIDs":null, + "LatestEventIDs":["$1463671339126270PnVwC:matrix.org"], + "AddsStateEventIDs":null, + "RemovesStateEventIDs":null, + "LastSentEventID":"" + }`, + } + + testRoomServer(input, want) + + fmt.Println("==PASSED==", os.Args[0]) +} diff --git a/travis-install-kafka.sh b/travis-install-kafka.sh new file mode 100755 index 00000000..20855fcc --- /dev/null +++ b/travis-install-kafka.sh @@ -0,0 +1,22 @@ +# /bin/bash + +set -eu + +# The mirror to download kafka from is picked from the list of mirrors at +# https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz +# TODO: Check the signature since we are downloading over HTTP. +MIRROR=http://mirror.ox.ac.uk/sites/rsync.apache.org/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz + +# Only download the kafka if it isn't already downloaded. +test -f kafka.tgz || wget $MIRROR -O kafka.tgz +# Unpack the kafka over the top of any existing installation +mkdir -p kafka && tar xzf kafka.tgz -C kafka --strip-components 1 +# Start the zookeeper running in the background. +# By default the zookeeper listens on localhost:2181 +kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties +# Enable topic deletion so that the integration tests can create a fresh topic +# for each test run. +echo "delete.topic.enable=true" >> kafka/config/server.properties +# Start the kafka server running in the background. +# By default the kafka listens on localhost:9092 +kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties diff --git a/travis-test.sh b/travis-test.sh new file mode 100755 index 00000000..b3ed18cf --- /dev/null +++ b/travis-test.sh @@ -0,0 +1,13 @@ +#! /bin/bash + +set -eu + +# Check that the servers build +gb build github.com/matrix-org/dendrite/roomserver/roomserver +gb build github.com/matrix-org/dendrite/roomserver/roomserver-integration-tests + +# Run the pre commit hooks +./hooks/pre-commit + +# Run the integration tests +bin/roomserver-integration-tests