app.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. import os
  2. import requests
  3. from datetime import datetime
  4. from flask import Flask, request, Response
  5. import telegram
  6. from telegrambot.credentials import bot_token, bot_user_name,URL
  7. from telegrambot.bot import get_response
  8. global bot
  9. global sven_chat_id
  10. global TOKEN
  11. global use_telegram
  12. global use_ntfy
  13. global ntfy_server
  14. global ntfy_topic
  15. ### TELEGRAM SETTINGS
  16. use_telegram = False
  17. TOKEN = bot_token
  18. sven_chat_id = 72665258
  19. bot = telegram.Bot(token=TOKEN)
  20. ### NTFY SETTINGS
  21. use_ntfy = True
  22. ntfy_server = "https://ntfy.hinz.casa"
  23. ntfy_topic = "haeusle"
  24. ntfy_auth = "aGFldXNsZTpzdm9saGkwNQ=="
  25. UPLOAD_FOLDER = '/home/sven/haeusle-uploads'
  26. app = Flask(__name__)
  27. @app.route('/{}'.format(TOKEN), methods=['POST'])
  28. def respond():
  29. # retrieve the message in JSON and then transform it to Telegram object
  30. update = telegram.Update.de_json(request.get_json(force=True), bot)
  31. chat_id = update.message.chat.id
  32. msg_id = update.message.message_id
  33. # Telegram understands UTF-8, so encode text for unicode compatibility
  34. text = update.message.text.encode('utf-8').decode()
  35. print("got text message :", text)
  36. response = get_response(text)
  37. if use_telegram: bot.sendMessage(chat_id=chat_id, text=chat_id, reply_to_message_id=msg_id)
  38. return 'ok'
  39. @app.route('/setwebhook', methods=['GET', 'POST'])
  40. def set_webhook():
  41. s = bot.setWebhook('{URL}{HOOK}'.format(URL=URL, HOOK=TOKEN))
  42. if s:
  43. return "webhook setup ok"
  44. else:
  45. return "webhook setup failed"
  46. # https://haeusle.server.name/event?event=eventname
  47. @app.route('/event')
  48. def receive_event():
  49. event = request.args.get('event')
  50. if event == "motion_detection":
  51. msg = sendEventNotification(event="motion_detection")
  52. elif event == "doorbell_button":
  53. msg = sendEventNotification(event="doorbell_button")
  54. elif event == "face_recognition":
  55. msg = sendEventNotification(event="face_recognition")
  56. return "<h1>Event</h1><br>" + msg
  57. app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
  58. @app.route('/upload', methods=['GET', 'POST'])
  59. def upload_file():
  60. # Upload process
  61. if request.method == 'POST':
  62. # Processing the image file
  63. if 'image_file' in request.files or "doorbell_button" in request.files or "motion_detection" in request.files:
  64. if 'motion_detection' in request.files:
  65. upload_type = 'motion_detection'
  66. elif "doorbell_button" in request.files:
  67. upload_type = 'doorbell_button'
  68. elif "face_recognition" in request.files:
  69. upload_type = 'face_recognition'
  70. else:
  71. upload_type = 'image_file'
  72. imageFile = request.files[upload_type]
  73. ext = imageFile.filename.split(".")[-1]
  74. now = datetime.now()
  75. new_filename = now.strftime("%Y-%m-%d_%H:%M:%S")
  76. path = os.path.join(app.config['UPLOAD_FOLDER'], upload_type, new_filename)
  77. # add original file extension
  78. if len(ext) > 1:
  79. path = path + "." + ext
  80. # Create Upload directory if not exists
  81. try:
  82. os.stat(os.path.dirname(path))
  83. except:
  84. os.mkdir(os.path.dirname(path))
  85. imageFile.save(path)
  86. sendImageNotification(image_path=path, upload_type=upload_type)
  87. return path
  88. else:
  89. return 'Unexpected POST command'
  90. return 'ok'
  91. # Url response if nothing is posted
  92. return '''
  93. <h1>Upload new File</h1>
  94. <form method="post" enctype="multipart/form-data">
  95. <input type="file" name="image_file">
  96. <input type="submit">
  97. </form>
  98. '''
  99. def sendImageNotification(image_path="", upload_type=""):
  100. file_ok = False
  101. file_types = [".jpg", ".gif", ".jpeg", ".png"]
  102. if os.path.isfile(image_path):
  103. for file_type in file_types:
  104. if file_type == "." + image_path.split(".")[-1].lower():
  105. file_ok = True
  106. break
  107. if file_ok:
  108. if upload_type == "doorbell_button":
  109. if use_telegram: bot.sendPhoto(chat_id=sven_chat_id, photo=open(image_path,'rb'), disable_notification=False)
  110. if use_ntfy: ntfyAttachmentMessage(msg="Jemand hat geklingelt!", file_path=image_path)
  111. elif upload_type == "motion_detection":
  112. if use_telegram: bot.sendPhoto(chat_id=sven_chat_id, photo=open(image_path,'rb'), disable_notification=True)
  113. if use_ntfy: ntfyAttachmentMessage(msg="Bewegung erkannt.", file_path=image_path)
  114. elif upload_type == "face_recognition":
  115. if use_ntfy: ntfyAttachmentMessage(msg="Person erkannt.", file_path=image_path)
  116. else:
  117. if use_telegram: bot.sendPhoto(chat_id=sven_chat_id, photo=open(image_path,'rb'), disable_notification=True)
  118. if use_ntfy: ntfyAttachmentMessage(msg="Neues Foto.", file_path=image_path)
  119. return True
  120. return False
  121. def sendEventNotification(event="", location="door"):
  122. no_notification = False
  123. if event == "doorbell_button":
  124. event_msg = "Türklingel: Jemand wartet"
  125. elif event == "motion_detection":
  126. event_msg = "Bewegungsmelder: Jemand ist am Hauseingang"
  127. no_notification = True
  128. if len(event_msg) > 1:
  129. if use_telegram: bot.sendMessage(chat_id=sven_chat_id, text=event_msg, disable_notification=no_notification)
  130. if use_ntfy: ntfyTextMessage(msg=event_msg)
  131. return event_msg
  132. return False
  133. def ntfyAttachmentMessage(msg="", file_path=False, url=False):
  134. requests.put(ntfy_server + "/" + ntfy_topic,
  135. data=open(file_path, 'rb'),
  136. headers={
  137. "Filename": file_path.split("/")[-1],
  138. "Authorization": "Basic " + ntfy_auth
  139. })
  140. def ntfyTextMessage(msg="", url=False):
  141. requests.post(ntfy_server + "/" + ntfy_topic,
  142. data=msg.encode('utf-8'),
  143. headers={
  144. "Authorization": "Basic " + ntfy_auth
  145. })
  146. @app.route('/')
  147. def index():
  148. return("Hello, World")
  149. if __name__ == "__main__":
  150. app.run(debug=True, host='0.0.0.0', threaded=True)