1
+ using Gremlin . Net . Driver ;
2
+ using Gremlin . Net . Structure . IO . GraphSON ;
3
+ using Microsoft . AspNetCore . Mvc ;
4
+ using Microsoft . Azure . Documents ;
5
+ using Microsoft . Azure . Documents . Client ;
6
+ using Microsoft . Azure . WebJobs ;
7
+ using Microsoft . Azure . WebJobs . Extensions . Http ;
8
+ using Microsoft . Azure . WebJobs . Extensions . SignalRService ;
9
+ using Microsoft . Azure . WebJobs . Host ;
10
+ using Microsoft . ProjectOxford . Vision ;
1
11
using System ;
12
+ using System . Collections . Generic ;
2
13
using System . IO ;
3
14
using System . Linq ;
4
15
using System . Net . Http ;
5
16
using System . Threading . Tasks ;
6
- using Microsoft . Azure . WebJobs ;
7
- using Microsoft . Azure . Documents ;
8
- using System . Collections . Generic ;
9
- using Microsoft . Azure . WebJobs . Host ;
10
- using Microsoft . ProjectOxford . Vision ;
11
- using Microsoft . Azure . Documents . Client ;
17
+
12
18
13
19
namespace PetCheckerFunction
14
20
{
15
21
public static class PetChecker
16
22
{
17
23
[ FunctionName ( "PetChecker" ) ]
18
- public static async Task RunPetChecker ( [ CosmosDBTrigger ( "pets" , "checks" , ConnectionStringSetting = "constr" , CreateLeaseCollectionIfNotExists = true ) ] IReadOnlyList < Document > document , TraceWriter log )
24
+ public static async Task RunPetChecker (
25
+ [ CosmosDBTrigger ( "pets" , "checks" , ConnectionStringSetting = "constr" ,
26
+ CreateLeaseCollectionIfNotExists = true ) ] IReadOnlyList < Document > document ,
27
+ [ SignalR ( HubName = "petcheckin" , ConnectionStringSetting = "AzureSignalRConnectionString" ) ] IAsyncCollector < SignalRMessage > sender ,
28
+ TraceWriter log )
19
29
{
20
- var httpClient = new HttpClient ( ) ;
30
+ var sendingResponse = false ;
21
31
try
22
32
{
23
33
foreach ( dynamic doc in document )
24
34
{
35
+ sendingResponse = false ;
25
36
var isProcessed = doc . IsApproved != null ;
26
37
if ( isProcessed )
27
38
{
28
39
continue ;
29
40
}
30
- var url = doc . MediaUrl ;
41
+
42
+ var url = doc . MediaUrl . ToString ( ) ;
31
43
var uploaded = ( DateTime ) doc . Created ;
32
44
log . Info ( $ ">>> Processing image in { url } upladed at { uploaded . ToString ( ) } ") ;
33
- var res = await httpClient . GetAsync ( url ) ;
34
- using ( var stream = await res . Content . ReadAsStreamAsync ( ) as Stream )
45
+
46
+ using ( var httpClient = new HttpClient ( ) )
35
47
{
48
+
49
+ var res = await httpClient . GetAsync ( url ) ;
50
+ var stream = await res . Content . ReadAsStreamAsync ( ) as Stream ;
36
51
log . Info ( $ "--- Image succesfully downloaded from storage") ;
37
- ( bool allowed , string message ) = await PassesImageModerationAsync ( stream , log ) ;
52
+ var ( allowed , message , tags ) = await PassesImageModerationAsync ( stream , log ) ;
38
53
log . Info ( $ "--- Image analyzed. It was { ( allowed ? string . Empty : "NOT" ) } approved") ;
39
54
doc . IsApproved = allowed ;
40
55
doc . Message = message ;
41
56
log . Info ( $ "--- Updating CosmosDb document to have historical data") ;
42
57
await UpsertDocument ( doc , log ) ;
43
- log . Info ( $ "<<< Image in { url } processed!") ;
58
+ log . Info ( $ "--- Updating Graph") ;
59
+ await InsertInGraph ( tags , doc , log ) ;
60
+ log . Info ( "--- Sending SignalR response." ) ;
61
+ sendingResponse = true ;
62
+ await SendSignalRResponse ( sender , allowed , message ) ;
63
+ log . Info ( $ "<<< Done! Image in { url } processed!") ;
44
64
}
45
65
}
46
66
}
47
- finally
67
+ catch ( Exception ex )
48
68
{
49
- httpClient ? . Dispose ( ) ;
69
+ var msg = $ "Error { ex . Message } ({ ex . GetType ( ) . Name } )";
70
+ log . Info ( "!!! " + msg ) ;
71
+
72
+ if ( ex is AggregateException aggex )
73
+ {
74
+ foreach ( var innex in aggex . InnerExceptions )
75
+ {
76
+ log . Info ( $ "!!! (inner) Error { innex . Message } ({ innex . GetType ( ) . Name } )") ;
77
+ }
78
+ }
79
+
80
+ if ( ! sendingResponse )
81
+ {
82
+ await SendSignalRResponse ( sender , false , msg ) ;
83
+ }
84
+ throw ex ;
50
85
}
51
86
}
52
- private static async Task UpsertDocument ( dynamic doc , TraceWriter log )
87
+
88
+ private static Task SendSignalRResponse ( IAsyncCollector < SignalRMessage > sender , bool isOk , string message )
89
+ {
90
+ return sender . AddAsync ( new SignalRMessage ( )
91
+ {
92
+ Target = "ProcessDone" ,
93
+ Arguments = new [ ] { new {
94
+ processedAt = DateTime . UtcNow ,
95
+ accepted = isOk ,
96
+ message
97
+ } }
98
+ } ) ;
99
+
100
+ }
101
+
102
+ private static async Task InsertInGraph ( IEnumerable < string > tags , dynamic doc , TraceWriter log )
103
+ {
104
+ var hostname = await GetSecret ( "gremlin_endpoint" ) ;
105
+ var port = await GetSecret ( "gremlin_port" ) ;
106
+ var database = "pets" ;
107
+ var collection = "checks" ;
108
+ var authKey = Environment . GetEnvironmentVariable ( "gremlin_key" ) ;
109
+ var portToUse = 443 ;
110
+ portToUse = int . TryParse ( port , out portToUse ) ? portToUse : 443 ;
111
+
112
+ var gremlinServer = new GremlinServer ( hostname , portToUse , enableSsl : true ,
113
+ username : "/dbs/" + database + "/colls/" + collection ,
114
+ password : authKey ) ;
115
+ var gremlinClient = new GremlinClient ( gremlinServer , new GraphSON2Reader ( ) , new GraphSON2Writer ( ) , GremlinClient . GraphSON2MimeType ) ;
116
+ foreach ( var tag in tags )
117
+ {
118
+ log . Info ( "--- --- Checking vertex for tag " + tag ) ;
119
+ await TryAddTag ( gremlinClient , tag , log ) ;
120
+ }
121
+
122
+ var queries = AddPetToGraphQueries ( doc , tags ) ;
123
+ log . Info ( "--- --- Adding vertex for pet checkin " ) ;
124
+ foreach ( string query in queries )
125
+ {
126
+ await gremlinClient . SubmitAsync < dynamic > ( query ) ;
127
+ }
128
+ }
129
+
130
+ private static async Task TryAddTag ( GremlinClient gremlinClient , string tag , TraceWriter log )
53
131
{
54
- var endpoint = Environment . GetEnvironmentVariable ( "cosmos_uri" ) ;
55
- var auth = Environment . GetEnvironmentVariable ( "cosmos_key" ) ;
56
- using ( var client = new DocumentClient ( new Uri ( endpoint ) , auth ) )
132
+ var query = $ "g.V('{ tag } ')";
133
+ var response = await gremlinClient . SubmitAsync < dynamic > ( query ) ;
134
+
135
+ if ( ! response . Any ( ) )
57
136
{
58
- var dbName = "pets" ;
59
- var colName = "checks" ;
60
- doc . Analyzed = DateTime . UtcNow ;
61
- await client . UpsertDocumentAsync (
62
- UriFactory . CreateDocumentCollectionUri ( dbName , colName ) , doc ) ;
63
- log . Info ( $ "--- CosmosDb document updated.") ;
137
+ log . Info ( "--- --- Adding vertex for tag " + tag ) ;
138
+ await gremlinClient . SubmitAsync < dynamic > ( AddTagToGraphQuery ( tag ) ) ;
64
139
}
65
140
}
66
- public static async Task < ( bool , string ) > PassesImageModerationAsync ( Stream image , TraceWriter log )
141
+
142
+ private static IEnumerable < string > AddPetToGraphQueries ( dynamic doc , IEnumerable < string > tags )
143
+ {
144
+ var id = doc . id . ToString ( ) ;
145
+
146
+ var msg = ( doc . Message ? . ToString ( ) ?? "" ) . Replace ( "'" , "\' " ) ;
147
+
148
+ yield return $ "g.addV('checkin').property('id','{ id } ').property('description','{ msg } ')";
149
+ foreach ( var tag in tags )
150
+ {
151
+ yield return $ "g.V('{ id } ').addE('seems').to(g.V('{ tag } '))";
152
+ }
153
+ }
154
+
155
+ private static string AddTagToGraphQuery ( string tag ) => $ "g.addV('tag').property('id', '{ tag } ').property('value', '{ tag } ')";
156
+
157
+ private static async Task UpsertDocument ( dynamic doc , TraceWriter log )
158
+ {
159
+ var endpoint = await GetSecret ( "cosmos_uri" ) ;
160
+ var auth = await GetSecret ( "cosmos_key" ) ;
161
+
162
+ var client = new DocumentClient ( new Uri ( endpoint ) , auth ) ;
163
+ var dbName = "pets" ;
164
+ var colName = "checks" ;
165
+ doc . Analyzed = DateTime . UtcNow ;
166
+ await client . UpsertDocumentAsync (
167
+ UriFactory . CreateDocumentCollectionUri ( dbName , colName ) , doc ) ;
168
+ log . Info ( $ "--- CosmosDb document updated.") ;
169
+ }
170
+
171
+ private static async Task < string > GetSecret ( string secretName )
67
172
{
68
- log . Info ( "--- Creating VisionApi client and analyzing image" ) ;
69
- var key = Environment . GetEnvironmentVariable ( "MicrosoftVisionApiKey" ) ;
70
- var endpoint = Environment . GetEnvironmentVariable ( "MicrosoftVisionApiEndpoint" ) ;
71
- var client = new VisionServiceClient ( key , endpoint ) ;
72
- var features = new VisualFeature [ ] { VisualFeature . Description } ;
73
- var result = await client . AnalyzeImageAsync ( image , features ) ;
74
- log . Info ( $ "--- Image analyzed with tags: { String . Join ( "," , result . Description . Tags ) } ") ;
75
- if ( ! int . TryParse ( Environment . GetEnvironmentVariable ( "MicrosoftVisionNumTags" ) , out var tagsToFetch ) )
173
+
174
+ return Environment . GetEnvironmentVariable ( secretName ) ;
175
+ }
176
+
177
+ public static async Task < ( bool allowd , string message , string [ ] tags ) > PassesImageModerationAsync ( Stream image , TraceWriter log )
178
+ {
179
+ try
76
180
{
77
- tagsToFetch = 5 ;
181
+ log . Info ( "--- Creating VisionApi client and analyzing image" ) ;
182
+
183
+ var key = await GetSecret ( "MicrosoftVisionApiKey" ) ;
184
+ var endpoint = await GetSecret ( "MicrosoftVisionApiEndpoint" ) ;
185
+ var numTags = await GetSecret ( "MicrosoftVisionNumTags" ) ;
186
+ var client = new VisionServiceClient ( key , endpoint ) ;
187
+ var features = new VisualFeature [ ] { VisualFeature . Description } ;
188
+ var result = await client . AnalyzeImageAsync ( image , features ) ;
189
+
190
+ log . Info ( $ "--- Image analyzed with tags: { String . Join ( "," , result . Description . Tags ) } ") ;
191
+ if ( ! int . TryParse ( numTags , out var tagsToFetch ) )
192
+ {
193
+ tagsToFetch = 5 ;
194
+ }
195
+ var fetchedTags = result ? . Description ? . Tags . Take ( tagsToFetch ) . ToArray ( ) ?? new string [ 0 ] ;
196
+ bool isAllowed = fetchedTags . Contains ( "dog" ) ;
197
+ string message = result ? . Description ? . Captions . FirstOrDefault ( ) ? . Text ;
198
+ return ( isAllowed , message , fetchedTags ) ;
199
+ }
200
+ catch ( Exception ex )
201
+ {
202
+ log . Info ( "Vision API error! " + ex . Message ) ;
203
+ return ( false , "error " + ex . Message , new string [ 0 ] ) ;
78
204
}
79
- bool isAllowed = result . Description . Tags . Take ( tagsToFetch ) . Contains ( "dog" ) ;
80
- string message = result ? . Description ? . Captions . FirstOrDefault ( ) ? . Text ;
81
- return ( isAllowed , message ) ;
82
205
}
206
+
207
+ [ FunctionName ( nameof ( SignalRInfo ) ) ]
208
+ public static IActionResult SignalRInfo (
209
+ [ HttpTrigger ( AuthorizationLevel . Anonymous , "post" ) ] HttpRequestMessage req ,
210
+ [ SignalRConnectionInfo ( HubName = "petcheckin" ) ] SignalRConnectionInfo info )
211
+ {
212
+ return info != null
213
+ ? ( ActionResult ) new OkObjectResult ( info )
214
+ : new NotFoundObjectResult ( "Failed to load SignalR Info." ) ;
215
+ }
216
+
83
217
}
84
218
}
0 commit comments