...
 
Commits (4)
......@@ -10,7 +10,10 @@ systemctl enable imageshow.service
display image:
`curl -XPOST http://host/ --data @file.svg` or `curl -XPOST http://host/ --data-binary @file.jpg`
display collor:
draw over previous image:
`curl -XPOST http://host/update --data @file.svg` or `curl -XPOST http://host/update --data-binary @file.jpg`
display color:
`curl -XPOST http://host/fill --data 'r=255&g=1&b=0&a=255'`
show current image:
......
......@@ -16,16 +16,37 @@ from urllib.parse import parse_qs
FRAMEBUFFER = 'fb0'
RSVGCONVERT = '/usr/bin/rsvg-convert'
RSVGCONVERT_ARGS = ['-u', '--keep-image-data']
RSVGCONVERT_ARGS = ['--unlimited', '--keep-image-data', '--background-color=None', '--format=png']
BLANKINGCMD = '/usr/bin/vcgencmd'
BLANKINGARGS_OFF = ['display_power', '0']
BLANKINGARGS_ON = ['display_power', '1']
class Server(BaseHTTPRequestHandler):
_processing = False
_payloadLength = 0
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'image/png')
self._respondCurrentImage()
def do_POST(self):
self._payloadLength = int(self.headers['Content-Length'])
if self.path == '/' or self.path == '/update':
self._processImage()
elif self.path == '/off':
self._setDisplayPower(False)
elif self.path == '/on':
self._setDisplayPower(True)
elif self.path == '/fill':
self._fillDisplay()
else:
self._sendResponse(404, b'Not Found')
def _sendResponse(self, code, body, contenttype = 'text/plain'):
self.send_response(code)
self.send_header('Content-type', contenttype)
self.end_headers()
self.wfile.write(body)
def _respondCurrentImage(self):
fh = open('/dev/' + FRAMEBUFFER, 'rb')
image = Image.frombytes('RGBA', (buf_width, buf_height), fh.read(), 'raw').convert('RGBA')
fh.close()
......@@ -35,91 +56,86 @@ class Server(BaseHTTPRequestHandler):
pngBytes = BytesIO()
new_image.save(pngBytes, format='PNG')
self.wfile.write(pngBytes.getvalue())
def do_POST(self):
if self.path == '/':
if self.draw():
self.send_response(204)
self.end_headers()
elif self.path == '/off':
length = int(self.headers['content-length'])
if length:
self.send_response(400)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes('no payload allowed', 'utf-8'))
else:
display_off()
self.send_response(204)
self.end_headers()
elif self.path == '/on':
length = int(self.headers['content-length'])
if length:
self.send_response(400)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes('no payload allowed', 'utf-8'))
else:
display_on()
self.send_response(204)
self.end_headers()
elif self.path == '/fill':
display_on()
length = int(self.headers['content-length'])
postvars = parse_qs(self.rfile.read(length))
print(postvars)
buffer = bytearray()
buffer.extend(struct.pack('B', int(postvars[b'b'][0])))
buffer.extend(struct.pack('B', int(postvars[b'g'][0])))
buffer.extend(struct.pack('B', int(postvars[b'r'][0])))
buffer.extend(struct.pack('B', int(postvars[b'a'][0])))
buffer = buffer * (buf_width * buf_height)
with open('/dev/' + FRAMEBUFFER, 'wb') as f:
f.write(buffer)
self.send_response(204)
self.end_headers()
self._sendResponse(200, pngBytes.getvalue(), 'image/png')
def _processImage(self):
if self._processing:
self._sendResponse(503, b'Service unavailable')
return
self._processing = True
post_data = self.rfile.read(self._payloadLength)
if post_data:
isUpdate = (self.path == '/update')
isSvg = (self.headers['Content-Type'] == 'image/svg+xml')
try:
process(post_data, isUpdate, isSvg)
except OSError as e:
print(e)
self._sendResponse(500, b'Internal Server Error')
except AssertionError as e:
self._sendResponse(400, bytes(str(e), 'utf-8'))
else:
self.send_response(404)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes('Not Found', 'utf-8'))
def draw(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
self._sendResponse(400, b'Empty payload')
self._processing = False
self._sendResponse(200, b'OK')
def _fillDisplay(self):
self._setDisplayPower(True)
postvars = parse_qs(self.rfile.read(self._payloadLength))
print(postvars)
buffer = bytearray()
buffer.extend(struct.pack('B', int(postvars[b'b'][0])))
buffer.extend(struct.pack('B', int(postvars[b'g'][0])))
buffer.extend(struct.pack('B', int(postvars[b'r'][0])))
buffer.extend(struct.pack('B', int(postvars[b'a'][0])))
buffer = buffer * (buf_width * buf_height)
if not post_data:
self.send_response(400)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes('Empty payload', 'utf-8'))
return False
return process(post_data, self)
def process(payload, http):
is_svg = http == False or http.headers['content-type'] == 'image/svg+xml'
try:
image = render(payload, is_svg)
except OSError as e:
print(e)
http.send_response(500)
http.send_header('Content-type', 'text/plain')
http.end_headers()
http.wfile.write(bytes('Internal Server Eror', 'utf-8'))
return False
if not check_size(image, http):
return False
image.save(last_image, format='PNG')
with open('/dev/' + FRAMEBUFFER, 'wb') as f:
f.write(buffer)
self._sendResponse(200, b'OK')
def _setDisplayPower(self, turnOn):
if self._payloadLength:
self._sendResponse(400, b'no payload allowed')
return
try:
setDisplayPower(turnOn)
except Exception as e:
print(e)
self._sendResponse(200, b'OK')
buf_width = 0
buf_height = 0
lastImage = None
def process(payload, isUpdate, isSvg):
global lastImage
image = render(payload, isSvg)
if not sizeIsValid(image):
print('Invalid size')
w, h = image.size
raise AssertionError('Size mismatch. got %dX%d. should be %dX%d' % (w,h,buf_width,buf_height))
print('size ok')
image = convert(image)
draw(image)
if isUpdate:
print('drawing over last image')
lastImage.paste(image, (0, 0), image)
else:
lastImage = image
writeToFramebuffer(lastImage)
print('done')
return True
def render(payload, is_svg):
if is_svg:
def sizeIsValid(image):
w, h = image.size
if w != buf_width or h != buf_height:
return False
return True
def render(payload, isSvg):
if isSvg:
print('detected SVG')
print('rendering')
args = [RSVGCONVERT] + RSVGCONVERT_ARGS
......@@ -130,63 +146,41 @@ def render(payload, is_svg):
imageData = payload
return Image.open(BytesIO(imageData)).convert('RGBA')
def check_size(image, http):
w, h = image.size
if w != buf_width or h != buf_height:
print('Invalid size')
if http:
http.send_response(400)
http.send_header('Content-type', 'text/plain')
http.end_headers()
http.wfile.write(bytes('Size mismatch. got %dX%d. should be %dX%d' % (w,h,buf_width,buf_height), 'utf-8'))
return False
print('size ok')
return True
def convert(image):
print('converting')
r,g,b,a = image.split()
return Image.merge("RGBA", (b,g,r,a))
def draw(image):
def writeToFramebuffer(image):
print('drawing')
display_on()
setDisplayPower(True)
with open('/dev/' + FRAMEBUFFER, 'wb') as f:
f.write(image.tobytes())
def display_off():
args = [BLANKINGCMD] + BLANKINGARGS_OFF
try:
Popen(args, stdout=PIPE, stdin=PIPE, stderr=PIPE)
except Exception as e:
print(e)
def display_on():
args = [BLANKINGCMD] + BLANKINGARGS_ON
try:
Popen(args, stdout=PIPE, stdin=PIPE, stderr=PIPE)
except Exception as e:
print(e)
def setDisplayPower(turnOn):
args = [BLANKINGCMD]
if turnOn:
args += BLANKINGARGS_ON
else:
args += BLANKINGARGS_OFF
Popen(args, stdout=PIPE, stdin=PIPE, stderr=PIPE)
def init():
initfile = '%dX%d.svg' % (buf_width, buf_height)
if path.isfile(initfile):
print('found init file')
svg = open(initfile, 'rb')
process(svg.read(), False)
process(svg.read(), False, True)
else:
print('no init file found. skipping')
buf_width = 0
buf_height = 0
last_image = BytesIO()
def run(server_class=HTTPServer, handler_class=Server, port=80):
global buf_width, buf_height
global buf_width, buf_height, lastImage
fbsize = open('/sys/class/graphics/' + FRAMEBUFFER + '/virtual_size', 'r')
w, h = fbsize.read().strip().split(',')
buf_width = int(w)
buf_height = int(h)
lastImage = Image.new('RGBA', (buf_width, buf_height))
print('detected frame buffer size %dX%d' % (buf_width, buf_height))
chdir(path.dirname(path.realpath(__file__)))
......