-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathcarplay.py
124 lines (118 loc) · 4.5 KB
/
carplay.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/python3
# "Autobox" dongle driver for HTML 'streaming' - test application
# Created by Colin Munro, December 2019
# See README.md for more information
"""Implementation of electric-monk's pycarplay for use with head-units"""
import decoder
import audiodecoder
import link
import protocol
from threading import Thread
import time
import queue
import os
import struct
class CarPlayReceiver:
class _Decoder(decoder.Decoder):
def __init__(self, owner):
super().__init__()
self._owner = owner
def on_key_event(self, event):
print(f'Got a key event: {event}')
self._owner.connection.send_key_event(event)
if event == decoder.KeyEvent.BUTTON_SELECT_DOWN:
self._owner.connection.send_key_event(decoder.KeyEvent.BUTTON_SELECT_UP)
class _AudioDecoder(audiodecoder.AudioDecoder):
def __init__(self, owner):
super().__init__()
self._owner = owner
class _Connection(link.Connection):
def __init__(self, owner):
super().__init__()
self._owner = owner
self._owner.av_queue = queue.Queue()
self.put_thread = Thread(target=self._put_thread, args=[self._owner])
self.put_thread.start()
def _put_thread(self, owner):
self._owner = owner
while True:
while self._owner.av_queue.qsize():
message = self._owner.av_queue.get()
if isinstance(message, protocol.Open):
if not self._owner.started:
self._owner._connected()
self.send_multiple(protocol.opened_info)
elif isinstance(message, protocol.VideoData):
self._owner.decoder.send(message.data)
elif isinstance(message, protocol.AudioData):
try:
self._owner.audio_decoder.send(message.data)
except Exception as e:
print(f"exception: {e}")
def on_message(self, message):
self._owner.av_queue.put(message);
def on_error(self, error):
self._owner._disconnect()
def __init__(self):
self._disconnect()
# self.server = self._Server(self)
self.decoder = self._Decoder(self)
self.audio_decoder = self._AudioDecoder(self)
self.heartbeat = Thread(target=self._heartbeat_thread)
self.heartbeat.start()
def _connected(self):
print("Connected!")
self.started = True
self.decoder.stop()
self.audio_decoder.stop()
self.decoder = self._Decoder(self)
self.audio_decoder = self._AudioDecoder(self)
def _disconnect(self):
if hasattr(self, "connection"):
if self.connection is None:
return
print("Lost USB device")
self._frame = b''
self.connection = None
self.started = False
def _heartbeat_thread(self):
while True:
try:
self.connection.send_message(protocol.Heartbeat())
except link.Error:
self._disconnect()
except:
pass
time.sleep(protocol.Heartbeat.lifecycle)
def _keylistener_thread(self, caller):
while True:
input1 = int(input())
print(f'you entered {input1}')
keys = protocol.CarPlay()
mcVal = struct.pack("<L",input1)
keys._setdata(mcVal)
caller.connection.send_message(keys)
def run(self):
self.keylistener = Thread(target=self._keylistener_thread, args=(self,))
self.keylistener.start()
while True:
# First task: look for USB device
while self.connection is None:
try:
self.connection = self._Connection(self)
except Exception as e:
pass
print("Found USB device...")
# Second task: transmit startup info
try:
while not self.started:
self.connection.send_multiple(protocol.startup_info)
time.sleep(1)
except:
self._disconnect()
print("Connection started!")
# Third task: idle while connected
while self.started:
time.sleep(1)
if __name__ == "__main__":
CarPlayReceiver().run()