|
| 1 | +import os |
| 2 | +import mock |
| 3 | +import pytest |
| 4 | +from blockfrost import SignatureVerificationError, verify_webhook_signature |
| 5 | + |
| 6 | +request_body = b'{"id":"47668401-c3a4-42d4-bac1-ad46515924a3","webhook_id":"cf68eb9c-635f-415e-a5a8-6233638f28d7","created":1650013856,"type":"block","payload":{"time":1650013853,"height":7126256,"hash":"f49521b67b440e5030adf124aee8f88881b7682ba07acf06c2781405b0f806a4","slot":58447562,"epoch":332,"epoch_slot":386762,"slot_leader":"pool1njjr0zn7uvydjy8067nprgwlyxqnznp9wgllfnag24nycgkda25","size":34617,"tx_count":13,"output":"13403118309871","fees":"4986390","block_vrf":"vrf_vk197w95j9alkwt8l4g7xkccknhn4pqwx65c5saxnn5ej3cpmps72msgpw69d","previous_block":"9e3f5bfc9f0be44cf6e14db9ed5f1efb6b637baff0ea1740bb6711786c724915","next_block":null,"confirmations":0}}' |
| 7 | +success_fixtures_list = [ |
| 8 | + { |
| 9 | + 'description': 'valid signature', |
| 10 | + 'request_body': request_body, |
| 11 | + 'signature_header': 't=1650013856,v1=f4c3bb2a8b0c8e21fa7d5fdada2ee87c9c6f6b0b159cc22e483146917e195c3e', |
| 12 | + 'secret': '59a1eb46-96f4-4f0b-8a03-b4d26e70593a', |
| 13 | + 'current_timestamp_mock': 1650013856 + 1, |
| 14 | + 'result': True |
| 15 | + }, |
| 16 | + { |
| 17 | + 'description': '2 signatures, one valid and one invalid', |
| 18 | + 'request_body': request_body, |
| 19 | + 'signature_header': 't=1650013856,v1=abc,v1=f4c3bb2a8b0c8e21fa7d5fdada2ee87c9c6f6b0b159cc22e483146917e195c3e', |
| 20 | + 'secret': '59a1eb46-96f4-4f0b-8a03-b4d26e70593a', |
| 21 | + 'current_timestamp_mock': 1650013856 + 1, |
| 22 | + 'result': True |
| 23 | + } |
| 24 | +] |
| 25 | + |
| 26 | +error_fixtures_list = [ |
| 27 | + { |
| 28 | + 'description': 'throws due to invalid header fromat', |
| 29 | + 'request_body': request_body, |
| 30 | + 'signature_header': 'v1=f4c3bb2a8b0c8e21fa7d5fdada2ee87c9c6f6b0b159cc22e483146917e195c3e', |
| 31 | + 'secret': '59a1eb46-96f4-4f0b-8a03-b4d26e70593a', |
| 32 | + 'current_timestamp_mock': 1650013856 + 1, |
| 33 | + 'result_error': 'Invalid signature header format.' |
| 34 | + }, |
| 35 | + { |
| 36 | + 'description': 'throws due to sig version not supported by this sdk', |
| 37 | + 'request_body': request_body, |
| 38 | + 'signature_header': 't=1650013856,v42=abc', |
| 39 | + 'secret': '59a1eb46-96f4-4f0b-8a03-b4d26e70593a', |
| 40 | + 'current_timestamp_mock': 1650013856 + 1, |
| 41 | + 'result_error': 'No signatures with supported version scheme.' |
| 42 | + }, |
| 43 | + { |
| 44 | + 'description': 'throws due to no signature match', |
| 45 | + 'request_body': request_body, |
| 46 | + 'signature_header': 't=1650013856,v1=abc', |
| 47 | + 'secret': '59a1eb46-96f4-4f0b-8a03-b4d26e70593a', |
| 48 | + 'current_timestamp_mock': 1650013856 + 1, |
| 49 | + 'result_error': 'No signature matches the expected signature for the payload.' |
| 50 | + }, |
| 51 | + { |
| 52 | + 'description': 'throws due to timestamp out of tolerance zone', |
| 53 | + 'request_body': request_body, |
| 54 | + 'signature_header': 't=1650013856,v1=f4c3bb2a8b0c8e21fa7d5fdada2ee87c9c6f6b0b159cc22e483146917e195c3e', |
| 55 | + 'secret': '59a1eb46-96f4-4f0b-8a03-b4d26e70593a', |
| 56 | + 'current_timestamp_mock': 1650013856 + 7200, |
| 57 | + 'result_error': 'Signature\'s timestamp is outside of the time tolerance.' |
| 58 | + } |
| 59 | +] |
| 60 | + |
| 61 | + |
| 62 | +@pytest.mark.parametrize("fixture", success_fixtures_list) |
| 63 | +def test_verify_webhook_signature(fixture): |
| 64 | + with mock.patch('blockfrost.helpers.get_unix_timestamp', return_value=fixture['current_timestamp_mock']): |
| 65 | + res = verify_webhook_signature( |
| 66 | + fixture['request_body'], fixture['signature_header'], fixture['secret']) |
| 67 | + assert res == fixture['result'] |
| 68 | + |
| 69 | + |
| 70 | +@pytest.mark.parametrize("fixture", error_fixtures_list) |
| 71 | +def test_verify_webhook_signature_fails(fixture): |
| 72 | + with mock.patch('blockfrost.helpers.get_unix_timestamp', return_value=fixture['current_timestamp_mock']): |
| 73 | + with pytest.raises(SignatureVerificationError) as e_info: |
| 74 | + verify_webhook_signature( |
| 75 | + fixture['request_body'], fixture['signature_header'], fixture['secret']) |
| 76 | + assert str(e_info.value) == fixture['result_error'] |
| 77 | + assert e_info.value.header == fixture['signature_header'] |
| 78 | + assert e_info.value.request_body == fixture['request_body'] |
0 commit comments