Adding the ability to cache campaigns on maillogs to greatly improve generation time (and, by extension, sending speed).

This is the initial commit that aims to improve the speed of sending emails. Ref #1726.
1726-sending-improvements
Jordan Wright 2020-01-27 22:18:20 -06:00
parent 0620671de6
commit 9e9776bdb3
7 changed files with 247 additions and 63 deletions

View File

@ -155,8 +155,8 @@ func (c *Campaign) UpdateStatus(s string) error {
}
// AddEvent creates a new campaign event in the database
func (c *Campaign) AddEvent(e *Event) error {
e.CampaignId = c.Id
func AddEvent(e *Event, campaignID int64) error {
e.CampaignId = campaignID
e.Time = time.Now().UTC()
whs, err := GetActiveWebhooks()
@ -500,7 +500,7 @@ func PostCampaign(c *Campaign, uid int64) error {
log.Error(err)
return err
}
err = c.AddEvent(&Event{Message: "Campaign Created"})
err = AddEvent(&Event{Message: "Campaign Created"}, c.Id)
if err != nil {
log.Error(err)
}

View File

@ -283,3 +283,55 @@ func BenchmarkCampaign10000(b *testing.B) {
}
tearDownBenchmark(b)
}
func BenchmarkGetCampaign100(b *testing.B) {
setupBenchmark(b)
campaign := setupCampaign(b, 100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := GetCampaign(campaign.Id, campaign.UserId)
if err != nil {
b.Fatalf("error getting campaign: %v", err)
}
}
tearDownBenchmark(b)
}
func BenchmarkGetCampaign1000(b *testing.B) {
setupBenchmark(b)
campaign := setupCampaign(b, 1000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := GetCampaign(campaign.Id, campaign.UserId)
if err != nil {
b.Fatalf("error getting campaign: %v", err)
}
}
tearDownBenchmark(b)
}
func BenchmarkGetCampaign5000(b *testing.B) {
setupBenchmark(b)
campaign := setupCampaign(b, 5000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := GetCampaign(campaign.Id, campaign.UserId)
if err != nil {
b.Fatalf("error getting campaign: %v", err)
}
}
tearDownBenchmark(b)
}
func BenchmarkGetCampaign10000(b *testing.B) {
setupBenchmark(b)
campaign := setupCampaign(b, 10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := GetCampaign(campaign.Id, campaign.UserId)
if err != nil {
b.Fatalf("error getting campaign: %v", err)
}
}
tearDownBenchmark(b)
}

View File

@ -37,6 +37,8 @@ type MailLog struct {
SendDate time.Time `json:"send_date"`
SendAttempt int `json:"send_attempt"`
Processing bool `json:"-"`
cachedCampaign *Campaign
}
// GenerateMailLog creates a new maillog for the given campaign and
@ -128,13 +130,27 @@ func (m *MailLog) Success() error {
// GetDialer returns a dialer based on the maillog campaign's SMTP configuration
func (m *MailLog) GetDialer() (mailer.Dialer, error) {
c, err := GetCampaign(m.CampaignId, m.UserId)
if err != nil {
return nil, err
c := m.cachedCampaign
if c == nil {
campaign, err := GetCampaign(m.CampaignId, m.UserId)
if err != nil {
return nil, err
}
c = &campaign
}
return c.SMTP.GetDialer()
}
// CacheCampaign allows bulk-mail workers to cache the otherwise expensive
// campaign lookup operation by providing a pointer to the campaign here.
func (m *MailLog) CacheCampaign(campaign *Campaign) error {
if campaign.Id != m.CampaignId {
return fmt.Errorf("incorrect campaign provided for caching. expected %d got %d", m.CampaignId, campaign.Id)
}
m.cachedCampaign = campaign
return nil
}
// Generate fills in the details of a gomail.Message instance with
// the correct headers and body from the campaign and recipient listed in
// the maillog. We accept the gomail.Message as an argument so that the caller
@ -144,9 +160,13 @@ func (m *MailLog) Generate(msg *gomail.Message) error {
if err != nil {
return err
}
c, err := GetCampaign(m.CampaignId, m.UserId)
if err != nil {
return err
c := m.cachedCampaign
if c == nil {
campaign, err := GetCampaign(m.CampaignId, m.UserId)
if err != nil {
return err
}
c = &campaign
}
f, err := mail.ParseAddress(c.SMTP.FromAddress)
@ -155,7 +175,7 @@ func (m *MailLog) Generate(msg *gomail.Message) error {
}
msg.SetAddressHeader("From", f.Address, f.Name)
ptx, err := NewPhishingTemplateContext(&c, r.BaseRecipient, r.RId)
ptx, err := NewPhishingTemplateContext(c, r.BaseRecipient, r.RId)
if err != nil {
return err
}

View File

@ -331,6 +331,7 @@ func BenchmarkMailLogGenerate100(b *testing.B) {
if err != nil {
b.Fatalf("error getting maillogs for campaign: %v", err)
}
ms[0].CacheCampaign(&campaign)
b.ResetTimer()
for i := 0; i < b.N; i++ {
msg := gomail.NewMessage()
@ -346,6 +347,7 @@ func BenchmarkMailLogGenerate1000(b *testing.B) {
if err != nil {
b.Fatalf("error getting maillogs for campaign: %v", err)
}
ms[0].CacheCampaign(&campaign)
b.ResetTimer()
for i := 0; i < b.N; i++ {
msg := gomail.NewMessage()
@ -361,6 +363,7 @@ func BenchmarkMailLogGenerate5000(b *testing.B) {
if err != nil {
b.Fatalf("error getting maillogs for campaign: %v", err)
}
ms[0].CacheCampaign(&campaign)
b.ResetTimer()
for i := 0; i < b.N; i++ {
msg := gomail.NewMessage()
@ -376,6 +379,7 @@ func BenchmarkMailLogGenerate10000(b *testing.B) {
if err != nil {
b.Fatalf("error getting maillogs for campaign: %v", err)
}
ms[0].CacheCampaign(&campaign)
b.ResetTimer()
for i := 0; i < b.N; i++ {
msg := gomail.NewMessage()

View File

@ -39,10 +39,6 @@ type Result struct {
}
func (r *Result) createEvent(status string, details interface{}) (*Event, error) {
c, err := GetCampaign(r.CampaignId, r.UserId)
if err != nil {
return nil, err
}
e := &Event{Email: r.Email, Message: status}
if details != nil {
dj, err := json.Marshal(details)
@ -51,7 +47,7 @@ func (r *Result) createEvent(status string, details interface{}) (*Event, error)
}
e.Details = string(dj)
}
c.AddEvent(e)
AddEvent(e, r.CampaignId)
return e, nil
}

View File

@ -45,55 +45,71 @@ func WithMailer(m mailer.Mailer) func(*DefaultWorker) error {
}
}
// processCampaigns loads maillogs scheduled to be sent before the provided
// time and sends them to the mailer.
func (w *DefaultWorker) processCampaigns(t time.Time) error {
ms, err := models.GetQueuedMailLogs(t.UTC())
if err != nil {
log.Error(err)
return err
}
// Lock the MailLogs (they will be unlocked after processing)
err = models.LockMailLogs(ms, true)
if err != nil {
return err
}
campaignCache := make(map[int64]models.Campaign)
// We'll group the maillogs by campaign ID to (roughly) group
// them by sending profile. This lets the mailer re-use the Sender
// instead of having to re-connect to the SMTP server for every
// email.
msg := make(map[int64][]mailer.Mail)
for _, m := range ms {
// We cache the campaign here to greatly reduce the time it takes to
// generate the message (ref #1726)
c, ok := campaignCache[m.CampaignId]
if !ok {
c, err = models.GetCampaign(m.CampaignId, m.UserId)
if err != nil {
return err
}
campaignCache[c.Id] = c
}
m.CacheCampaign(&c)
msg[m.CampaignId] = append(msg[m.CampaignId], m)
}
// Next, we process each group of maillogs in parallel
for cid, msc := range msg {
go func(cid int64, msc []mailer.Mail) {
c := campaignCache[cid]
if c.Status == models.CampaignQueued {
err := c.UpdateStatus(models.CampaignInProgress)
if err != nil {
log.Error(err)
return
}
}
log.WithFields(logrus.Fields{
"num_emails": len(msc),
}).Info("Sending emails to mailer for processing")
w.mailer.Queue(msc)
}(cid, msc)
}
return nil
}
// Start launches the worker to poll the database every minute for any pending maillogs
// that need to be processed.
func (w *DefaultWorker) Start() {
log.Info("Background Worker Started Successfully - Waiting for Campaigns")
go w.mailer.Start(context.Background())
for t := range time.Tick(1 * time.Minute) {
ms, err := models.GetQueuedMailLogs(t.UTC())
err := w.processCampaigns(t)
if err != nil {
log.Error(err)
continue
}
// Lock the MailLogs (they will be unlocked after processing)
err = models.LockMailLogs(ms, true)
if err != nil {
log.Error(err)
continue
}
// We'll group the maillogs by campaign ID to (sort of) group
// them by sending profile. This lets the mailer re-use the Sender
// instead of having to re-connect to the SMTP server for every
// email.
msg := make(map[int64][]mailer.Mail)
for _, m := range ms {
msg[m.CampaignId] = append(msg[m.CampaignId], m)
}
// Next, we process each group of maillogs in parallel
for cid, msc := range msg {
go func(cid int64, msc []mailer.Mail) {
uid := msc[0].(*models.MailLog).UserId
c, err := models.GetCampaign(cid, uid)
if err != nil {
log.Error(err)
errorMail(err, msc)
return
}
if c.Status == models.CampaignQueued {
err := c.UpdateStatus(models.CampaignInProgress)
if err != nil {
log.Error(err)
return
}
}
log.WithFields(logrus.Fields{
"num_emails": len(msc),
}).Info("Sending emails to mailer for processing")
w.mailer.Queue(msc)
}(cid, msc)
}
}
}
@ -116,6 +132,11 @@ func (w *DefaultWorker) LaunchCampaign(c models.Campaign) {
m.Unlock()
continue
}
err = m.CacheCampaign(&c)
if err != nil {
log.Error(err)
return
}
mailEntries = append(mailEntries, m)
}
w.mailer.Queue(mailEntries)

View File

@ -1,11 +1,29 @@
package worker
import (
"context"
"fmt"
"testing"
"time"
"github.com/gophish/gophish/config"
"github.com/gophish/gophish/mailer"
"github.com/gophish/gophish/models"
"github.com/stretchr/testify/suite"
)
type logMailer struct {
queue chan []mailer.Mail
}
func (m *logMailer) Start(ctx context.Context) {
return
}
func (m *logMailer) Queue(ms []mailer.Mail) {
m.queue <- ms
}
// WorkerSuite is a suite of tests to cover API related functions
type WorkerSuite struct {
suite.Suite
@ -24,6 +42,7 @@ func (s *WorkerSuite) SetupSuite() {
}
s.config = conf
s.Nil(err)
s.setupCampaignDependencies()
}
func (s *WorkerSuite) TearDownTest() {
@ -33,13 +52,16 @@ func (s *WorkerSuite) TearDownTest() {
}
}
func (s *WorkerSuite) SetupTest() {
func (s *WorkerSuite) setupCampaignDependencies() {
s.config.TestFlag = true
// Add a group
group := models.Group{Name: "Test Group"}
group.Targets = []models.Target{
models.Target{BaseRecipient: models.BaseRecipient{Email: "test1@example.com", FirstName: "First", LastName: "Example"}},
models.Target{BaseRecipient: models.BaseRecipient{Email: "test2@example.com", FirstName: "Second", LastName: "Example"}},
for i := 0; i < 10; i++ {
group.Targets = append(group.Targets, models.Target{
BaseRecipient: models.BaseRecipient{
Email: fmt.Sprintf("test%d@example.com", i),
FirstName: "First",
LastName: "Example"}})
}
group.UserId = 1
models.PostGroup(&group)
@ -64,15 +86,84 @@ func (s *WorkerSuite) SetupTest() {
smtp.Host = "example.com"
smtp.FromAddress = "test@test.com"
models.PostSMTP(&smtp)
}
func (s *WorkerSuite) setupCampaign(id int) (*models.Campaign, error) {
// Setup and "launch" our campaign
// Set the status such that no emails are attempted
c := models.Campaign{Name: "Test campaign"}
c := models.Campaign{Name: fmt.Sprintf("Test campaign - %d", id)}
c.UserId = 1
c.Template = t
c.Page = p
template, err := models.GetTemplate(1, 1)
if err != nil {
return nil, err
}
c.Template = template
page, err := models.GetPage(1, 1)
if err != nil {
return nil, err
}
c.Page = page
smtp, err := models.GetSMTP(1, 1)
if err != nil {
return nil, err
}
c.SMTP = smtp
group, err := models.GetGroup(1, 1)
if err != nil {
return nil, err
}
c.Groups = []models.Group{group}
models.PostCampaign(&c, c.UserId)
c.UpdateStatus(models.CampaignEmailsSent)
err = models.PostCampaign(&c, c.UserId)
if err != nil {
return nil, err
}
err = c.UpdateStatus(models.CampaignEmailsSent)
return &c, err
}
func (s *WorkerSuite) TestMailLogGrouping() {
// Create the campaigns and unlock the maillogs so that they're picked up
// by the worker
for i := 0; i < 10; i++ {
campaign, err := s.setupCampaign(i)
s.Nil(err)
ms, err := models.GetMailLogsByCampaign(campaign.Id)
s.Nil(err)
for _, m := range ms {
m.Unlock()
}
}
lm := &logMailer{queue: make(chan []mailer.Mail)}
worker := &DefaultWorker{}
worker.mailer = lm
// Trigger the worker, generating the maillogs and sending them to the
// mailer
worker.processCampaigns(time.Now())
// Verify that each slice of maillogs received belong to the same campaign
for i := 0; i < 10; i++ {
ms := <-lm.queue
maillog, ok := ms[0].(*models.MailLog)
if !ok {
s.T().Fatalf("unable to cast mail to models.MailLog")
}
expected := maillog.CampaignId
for _, m := range ms {
maillog, ok = m.(*models.MailLog)
if !ok {
s.T().Fatalf("unable to cast mail to models.MailLog")
}
got := maillog.CampaignId
s.Equal(expected, got)
}
}
}
func TestMailerSuite(t *testing.T) {
suite.Run(t, new(WorkerSuite))
}