gophernotes/messages.go

151 lines
4.1 KiB
Go
Raw Normal View History

package main
import (
2016-01-23 13:48:33 -06:00
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
2016-07-29 07:21:46 -05:00
"log"
2016-07-29 06:10:43 -05:00
zmq "github.com/alecthomas/gozmq"
uuid "github.com/nu7hatch/gouuid"
2016-07-29 07:21:46 -05:00
"github.com/pkg/errors"
)
2016-07-29 16:52:27 -05:00
// MsgHeader encodes header info for ZMQ messages.
2016-01-23 13:48:33 -06:00
type MsgHeader struct {
2016-01-24 00:16:23 -06:00
MsgID string `json:"msg_id"`
2016-01-23 13:48:33 -06:00
Username string `json:"username"`
Session string `json:"session"`
2016-01-24 00:16:23 -06:00
MsgType string `json:"msg_type"`
}
// ComposedMsg represents an entire message in a high-level structure.
type ComposedMsg struct {
2016-01-24 00:16:23 -06:00
Header MsgHeader
ParentHeader MsgHeader
Metadata map[string]interface{}
Content interface{}
}
// InvalidSignatureError is returned when the signature on a received message does not
// validate.
2016-01-23 13:48:33 -06:00
type InvalidSignatureError struct{}
func (e *InvalidSignatureError) Error() string {
2016-01-23 13:48:33 -06:00
return "A message had an invalid signature"
}
// WireMsgToComposedMsg translates a multipart ZMQ messages received from a socket into
// a ComposedMsg struct and a slice of return identities. This includes verifying the
// message signature.
2016-07-29 16:52:27 -05:00
func WireMsgToComposedMsg(msgparts [][]byte, signkey []byte) (ComposedMsg, [][]byte, error) {
2016-01-23 13:48:33 -06:00
i := 0
for string(msgparts[i]) != "<IDS|MSG>" {
i++
}
2016-07-29 16:52:27 -05:00
identities := msgparts[:i]
2016-01-23 13:48:33 -06:00
// Validate signature
2016-07-29 16:52:27 -05:00
var msg ComposedMsg
2016-01-23 13:48:33 -06:00
if len(signkey) != 0 {
mac := hmac.New(sha256.New, signkey)
for _, msgpart := range msgparts[i+2 : i+6] {
mac.Write(msgpart)
}
signature := make([]byte, hex.DecodedLen(len(msgparts[i+1])))
hex.Decode(signature, msgparts[i+1])
if !hmac.Equal(mac.Sum(nil), signature) {
return msg, nil, &InvalidSignatureError{}
}
}
json.Unmarshal(msgparts[i+2], &msg.Header)
2016-01-24 00:16:23 -06:00
json.Unmarshal(msgparts[i+3], &msg.ParentHeader)
2016-01-23 13:48:33 -06:00
json.Unmarshal(msgparts[i+4], &msg.Metadata)
json.Unmarshal(msgparts[i+5], &msg.Content)
2016-07-29 16:52:27 -05:00
return msg, identities, nil
}
// ToWireMsg translates a ComposedMsg into a multipart ZMQ message ready to send, and
// signs it. This does not add the return identities or the delimiter.
2016-07-29 07:21:46 -05:00
func (msg ComposedMsg) ToWireMsg(signkey []byte) ([][]byte, error) {
msgparts := make([][]byte, 5)
header, err := json.Marshal(msg.Header)
if err != nil {
return msgparts, errors.Wrap(err, "Could not marshal message header")
}
2016-01-23 13:48:33 -06:00
msgparts[1] = header
2016-07-29 07:21:46 -05:00
parentHeader, err := json.Marshal(msg.ParentHeader)
if err != nil {
return msgparts, errors.Wrap(err, "Could not marshal parent header")
}
2016-01-24 00:16:23 -06:00
msgparts[2] = parentHeader
2016-07-29 07:21:46 -05:00
2016-01-23 13:48:33 -06:00
if msg.Metadata == nil {
msg.Metadata = make(map[string]interface{})
}
2016-07-29 07:21:46 -05:00
metadata, err := json.Marshal(msg.Metadata)
if err != nil {
return msgparts, errors.Wrap(err, "Could not marshal metadata")
}
2016-01-23 13:48:33 -06:00
msgparts[3] = metadata
2016-07-29 07:21:46 -05:00
content, err := json.Marshal(msg.Content)
if err != nil {
return msgparts, errors.Wrap(err, "Could not marshal content")
}
2016-01-23 13:48:33 -06:00
msgparts[4] = content
2016-07-29 16:52:27 -05:00
// Sign the message.
2016-01-23 13:48:33 -06:00
if len(signkey) != 0 {
mac := hmac.New(sha256.New, signkey)
for _, msgpart := range msgparts[1:] {
mac.Write(msgpart)
}
msgparts[0] = make([]byte, hex.EncodedLen(mac.Size()))
hex.Encode(msgparts[0], mac.Sum(nil))
}
2016-07-29 07:21:46 -05:00
return msgparts, nil
}
// MsgReceipt represents a received message, its return identities, and the sockets for
// communication.
type MsgReceipt struct {
2016-01-23 13:48:33 -06:00
Msg ComposedMsg
Identities [][]byte
Sockets SocketGroup
}
// SendResponse sends a message back to return identites of the received message.
func (receipt *MsgReceipt) SendResponse(socket *zmq.Socket, msg ComposedMsg) {
2016-07-29 07:21:46 -05:00
2016-01-23 13:48:33 -06:00
socket.SendMultipart(receipt.Identities, zmq.SNDMORE)
socket.Send([]byte("<IDS|MSG>"), zmq.SNDMORE)
2016-07-29 07:21:46 -05:00
msgParts, err := msg.ToWireMsg(receipt.Sockets.Key)
if err != nil {
log.Fatalln(err)
}
socket.SendMultipart(msgParts, 0)
2016-01-24 00:16:23 -06:00
logger.Println("<--", msg.Header.MsgType)
2016-01-23 13:48:33 -06:00
logger.Printf("%+v\n", msg.Content)
}
// NewMsg creates a new ComposedMsg to respond to a parent message. This includes setting
// up its headers.
2016-01-24 00:16:23 -06:00
func NewMsg(msgType string, parent ComposedMsg) (msg ComposedMsg) {
msg.ParentHeader = parent.Header
2016-01-23 13:48:33 -06:00
msg.Header.Session = parent.Header.Session
msg.Header.Username = parent.Header.Username
2016-01-24 00:16:23 -06:00
msg.Header.MsgType = msgType
2016-07-29 07:21:46 -05:00
u, err := uuid.NewV4()
if err != nil {
log.Fatalln(errors.Wrap(err, "Could not generate UUID"))
}
2016-01-24 00:16:23 -06:00
msg.Header.MsgID = u.String()
2016-01-23 13:48:33 -06:00
return
}