-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #14 from austin1237/cache
dynamo cache
- Loading branch information
Showing
19 changed files
with
465 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,2 @@ | ||
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= | ||
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | ||
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
package cache | ||
|
||
import ( | ||
"scraper/job" | ||
) | ||
|
||
type Table interface { | ||
ReadItem(company string) (string, error) | ||
WriteItems(companies []string) | ||
} | ||
|
||
type Cache struct { | ||
table Table | ||
} | ||
|
||
func NewCache(table Table) *Cache { | ||
return &Cache{table: table} | ||
} | ||
|
||
func (c *Cache) FilterCachedCompanies(jobs []job.Job) ([]job.Job, error) { | ||
notInCache := make([]job.Job, 0) | ||
errChan := make(chan error, len(jobs)) | ||
notFoundChan := make(chan job.Job, len(jobs)) | ||
foundChan := make(chan job.Job, len(jobs)) | ||
|
||
for _, newJob := range jobs { | ||
go func(newJob job.Job) { | ||
result, err := c.table.ReadItem(newJob.Company) | ||
if result == "" { | ||
// company is not in the cache | ||
notFoundChan <- newJob | ||
} else { | ||
foundChan <- newJob | ||
} | ||
|
||
if err != nil { | ||
errChan <- err | ||
} | ||
|
||
}(newJob) | ||
} | ||
|
||
// Collect results from the goroutines | ||
for range jobs { | ||
select { | ||
case job := <-notFoundChan: | ||
notInCache = append(notInCache, job) | ||
case <-foundChan: | ||
// do nothing | ||
case err := <-errChan: | ||
return nil, err | ||
} | ||
|
||
} | ||
|
||
return notInCache, nil | ||
} | ||
|
||
func (c *Cache) WriteCompaniesToCache(jobs []job.Job) { | ||
companies := make([]string, 0, len(jobs)) | ||
for _, job := range jobs { | ||
companies = append(companies, job.Company) | ||
} | ||
c.table.WriteItems(companies) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
package cache | ||
|
||
import ( | ||
"scraper/job" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/mock" | ||
) | ||
|
||
type MockTable struct { | ||
mock.Mock | ||
} | ||
|
||
func (m *MockTable) ReadItem(company string) (string, error) { | ||
args := m.Called(company) | ||
return args.String(0), args.Error(1) | ||
} | ||
|
||
func (m *MockTable) WriteItems(companies []string) { | ||
m.Called(companies) | ||
} | ||
|
||
func TestFilterCachedCompanies(t *testing.T) { | ||
mockTable := new(MockTable) | ||
mockTable.On("ReadItem", "Acme Corp").Return("Acme Corp", nil) | ||
mockTable.On("ReadItem", "Globex Corporation").Return("", nil) | ||
|
||
cache := &Cache{ | ||
table: mockTable, | ||
} | ||
|
||
// Test the FilterCachedCompanies method | ||
jobs := []job.Job{ | ||
{Company: "Acme Corp"}, | ||
{Company: "Globex Corporation"}, | ||
} | ||
notInCache, err := cache.FilterCachedCompanies(jobs) | ||
|
||
assert.NoError(t, err) | ||
assert.Len(t, notInCache, 1) | ||
assert.Equal(t, "Globex Corporation", notInCache[0].Company) | ||
|
||
mockTable.AssertExpectations(t) | ||
} | ||
|
||
func TestWriteCompaniesToCache(t *testing.T) { | ||
mockTable := new(MockTable) | ||
mockTable.On("WriteItems", []string{"Acme Corp", "Globex Corporation"}).Return() | ||
|
||
cache := &Cache{ | ||
table: mockTable, | ||
} | ||
|
||
// Test the WriteCompaniesToCache method | ||
jobs := []job.Job{ | ||
{Company: "Acme Corp"}, | ||
{Company: "Globex Corporation"}, | ||
} | ||
cache.WriteCompaniesToCache(jobs) | ||
|
||
mockTable.AssertExpectations(t) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
package dynamo | ||
|
||
import ( | ||
"log" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/session" | ||
"github.com/aws/aws-sdk-go/service/dynamodb" | ||
) | ||
|
||
type DynamoDBAPI interface { | ||
UpdateItem(input *dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) | ||
GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) | ||
} | ||
|
||
type Table struct { | ||
Name string | ||
svc DynamoDBAPI | ||
} | ||
|
||
func NewTable(name string, region string) (*Table, error) { | ||
sess, err := session.NewSession(&aws.Config{ | ||
Region: aws.String(region), // replace with your region | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
svc := dynamodb.New(sess) | ||
|
||
return &Table{Name: name, svc: svc}, nil | ||
} | ||
|
||
func (t *Table) ReadItem(company string) (string, error) { | ||
input := &dynamodb.GetItemInput{ | ||
TableName: aws.String(t.Name), | ||
Key: map[string]*dynamodb.AttributeValue{ | ||
"company": { | ||
S: aws.String(strings.ToLower(company)), | ||
}, | ||
}, | ||
} | ||
|
||
result, err := t.svc.GetItem(input) | ||
if err != nil { | ||
return "", err | ||
} | ||
|
||
if result.Item == nil { | ||
return "", nil | ||
} | ||
|
||
return *result.Item["company"].S, nil | ||
} | ||
|
||
func (t *Table) WriteItems(companies []string) { | ||
// Set the ttl time to 30 days from now | ||
expirationTime := time.Now().AddDate(0, 1, 0).Unix() | ||
|
||
// Create a wait group | ||
var wg sync.WaitGroup | ||
|
||
// Write each company to the table in a separate goroutine | ||
for _, company := range companies { | ||
wg.Add(1) | ||
go func(company string) { | ||
defer wg.Done() | ||
|
||
input := &dynamodb.UpdateItemInput{ | ||
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{ | ||
":expirationTime": { | ||
N: aws.String(strconv.FormatInt(expirationTime, 10)), | ||
}, | ||
}, | ||
TableName: aws.String(t.Name), | ||
Key: map[string]*dynamodb.AttributeValue{ | ||
"company": { | ||
S: aws.String(strings.ToLower(company)), | ||
}, | ||
}, | ||
ReturnValues: aws.String("UPDATED_NEW"), | ||
UpdateExpression: aws.String("set ExpirationTime = :expirationTime"), | ||
} | ||
|
||
_, err := t.svc.UpdateItem(input) | ||
if err != nil { | ||
log.Println("Error writing company to cache", err) | ||
} | ||
}(company) | ||
} | ||
|
||
// Wait for all goroutines to finish | ||
wg.Wait() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package dynamo | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/aws/aws-sdk-go/service/dynamodb" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/mock" | ||
) | ||
|
||
type MockDynamoDB struct { | ||
mock.Mock | ||
} | ||
|
||
func (m *MockDynamoDB) UpdateItem(input *dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) { | ||
args := m.Called(input) | ||
return args.Get(0).(*dynamodb.UpdateItemOutput), args.Error(1) | ||
} | ||
|
||
func (m *MockDynamoDB) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) { | ||
args := m.Called(input) | ||
return args.Get(0).(*dynamodb.GetItemOutput), args.Error(1) | ||
} | ||
|
||
func TestNewTable(t *testing.T) { | ||
table, err := NewTable("test", "us-west-2") | ||
assert.NoError(t, err) | ||
assert.NotNil(t, table) | ||
} | ||
|
||
func TestReadItem(t *testing.T) { | ||
mockSvc := new(MockDynamoDB) | ||
table := &Table{Name: "test", svc: mockSvc} | ||
|
||
mockSvc.On("GetItem", mock.Anything).Return(&dynamodb.GetItemOutput{}, nil) | ||
|
||
_, err := table.ReadItem("Acme Corp") | ||
assert.NoError(t, err) | ||
|
||
mockSvc.AssertExpectations(t) | ||
} | ||
|
||
func TestWriteItems(t *testing.T) { | ||
mockSvc := new(MockDynamoDB) | ||
table := &Table{Name: "test", svc: mockSvc} | ||
|
||
mockSvc.On("UpdateItem", mock.Anything).Return(&dynamodb.UpdateItemOutput{}, nil) | ||
|
||
table.WriteItems([]string{"Acme Corp", "Globex Corporation"}) | ||
|
||
mockSvc.AssertExpectations(t) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.