-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtests.py
113 lines (95 loc) · 4.59 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# -* coding: utf-8 -*-
"""
We use db "test" and collection "people" for testing purposes
"""
from unittest import TestCase
from mongoprofile import parse_record, MongoProfiler, DummyMongoProfiler
class ParserTest(TestCase):
command_cmd = "query test.$cmd ntoreturn:1 command: { drop: \"people\" } reslen:134 bytes:118"
insert_cmd = "insert test.people"
update_cmd = "update test.people query: { name: \"John\" } nscanned:1 fastmod "
remove_cmd = "remove test.people query: { name: \"Mary\" }"
marker_cmd = 'query test.phony_mongoprofile_marker reslen:0 nscanned:1 \nquery: { $query: { text: "hello world" } } nreturned:1 bytes:70'
query_commands = [
(
'query test.people reslen:86 nscanned:1 \nquery: { age: { $gt: 20.0 } } nreturned:1 bytes:70',
'test> db.people.find({ age: { $gt: 20.0 } })'
),
(
'query test.people ntoreturn:1 reslen:199 nscanned:4 \nquery: { $query: { age: { $gt: 20.0 } }, $orderby: { age: -1 } } nreturned:1 bytes:70',
'test> db.people.find({ $query: { age: { $gt: 20.0 } }, $orderby: { age: -1 } })'
),
]
getmore_cmd = 'getmore test.people cid:5236062738003527185 getMore: { $query: { age: { $gt: 20.0 } } } bytes:128950 nreturned:1750'
def testCommandCmd(self):
record_source = dict(info=self.command_cmd)
record = parse_record(record_source)
self.assertEquals(str(record), 'test> db.runCommand({ drop: "people" })')
self.assertEquals(record['ntoreturn'], 1)
self.assertEquals(record['reslen'], 134)
self.assertEquals(record['bytes'], 118)
def testInsertCmd(self):
record_source = dict(info=self.insert_cmd)
record = parse_record(record_source)
self.assertEquals(str(record), 'test> db.people.insert({...})')
def testUpdateCmd(self):
record_source = dict(info=self.update_cmd)
record = parse_record(record_source)
self.assertEquals(str(record), 'test> db.people.update({ name: "John" }, {...})')
self.assertEquals(record['nscanned'], 1)
self.assertEquals(record['fastmod'], True)
def testRemoveCmd(self):
record_source = dict(info=self.remove_cmd)
record = parse_record(record_source)
self.assertEquals(str(record), 'test> db.people.remove({ name: "Mary" })')
def testMarkerCmd(self):
record_source = dict(info=self.marker_cmd)
record = parse_record(record_source)
self.assertEquals(str(record), '==== hello world ====')
def testQueryCmd(self):
for cmd, result in self.query_commands:
record_source = dict(info=cmd)
record = parse_record(record_source)
self.assertEquals(str(record), result)
def testGetMoreCmd(self):
record_source = dict(info=self.getmore_cmd)
record = parse_record(record_source)
self.assertEquals(str(record), 'test> db.people.find({ $query: { age: { $gt: 20.0 } } }) *getmore')
from pymongo import Connection
class MongoProfile(TestCase):
expected_records = [
'==== insert ====',
'test> db.people.insert({...})',
'test> db.people.insert({...})',
'==== modification ====',
'test> db.people.update({ name: "John" }, {...})',
'test> db.people.remove({ name: "Mary" })',
'==== search ====',
'test> db.people.find({ $query: { age: { $gt: 20.0 } } })',
'test> db.runCommand({ count: "people", query: { age: { $gt: 20.0 } }, fields: null })',
]
def setUp(self):
self.db = Connection().test
def testMongoProfile(self):
profiler = MongoProfiler(self.db)
self._doQueries(profiler)
record_info = [str(record) for record in profiler.get_records()]
for expected, received in zip(self.expected_records, record_info):
self.assertEquals(expected, received)
def testDummyMongoProfile(self):
profiler = DummyMongoProfiler(self.db)
self._doQueries(profiler)
self.assertEquals(profiler.get_records(), [])
def _doQueries(self, profiler):
with profiler:
profiler.mark('insert')
self.db.people.insert(dict(name='John', age=20))
self.db.people.insert(dict(name='Mary', age=30))
profiler.mark('modification')
self.db.people.update({'name': 'John'}, {'age': 21})
self.db.people.remove({'name': 'Mary'})
profiler.mark('search')
list(self.db.people.find({'age': {'$gt': 20.0}}))
self.db.people.find({'age': {'$gt': 20.0}}).count()
def tearDown(self):
self.db.drop_collection('people')