123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- import os
- import requests
- from datetime import datetime
- from flask import Flask, request, Response
- import telegram
- from telegrambot.credentials import bot_token, bot_user_name,URL
- from telegrambot.bot import get_response
- global bot
- global sven_chat_id
- global TOKEN
- global use_telegram
- global use_ntfy
- global ntfy_server
- global ntfy_topic
- ### TELEGRAM SETTINGS
- use_telegram = False
- TOKEN = bot_token
- sven_chat_id = 72665258
- bot = telegram.Bot(token=TOKEN)
- ### NTFY SETTINGS
- use_ntfy = True
- ntfy_server = "https://ntfy.hinz.casa"
- ntfy_topic = "haeusle"
- ntfy_auth = "aGFldXNsZTpzdm9saGkwNQ=="
- UPLOAD_FOLDER = '/home/sven/haeusle-uploads'
- app = Flask(__name__)
- @app.route('/{}'.format(TOKEN), methods=['POST'])
- def respond():
- # retrieve the message in JSON and then transform it to Telegram object
- update = telegram.Update.de_json(request.get_json(force=True), bot)
- chat_id = update.message.chat.id
- msg_id = update.message.message_id
- # Telegram understands UTF-8, so encode text for unicode compatibility
- text = update.message.text.encode('utf-8').decode()
- print("got text message :", text)
- response = get_response(text)
- if use_telegram: bot.sendMessage(chat_id=chat_id, text=chat_id, reply_to_message_id=msg_id)
- return 'ok'
- @app.route('/setwebhook', methods=['GET', 'POST'])
- def set_webhook():
- s = bot.setWebhook('{URL}{HOOK}'.format(URL=URL, HOOK=TOKEN))
- if s:
- return "webhook setup ok"
- else:
- return "webhook setup failed"
- # https://haeusle.server.name/event?event=eventname
- @app.route('/event')
- def receive_event():
- event = request.args.get('event')
- if event == "motion_detection":
- msg = sendEventNotification(event="motion_detection")
- elif event == "doorbell_button":
- msg = sendEventNotification(event="doorbell_button")
- elif event == "face_recognition":
- msg = sendEventNotification(event="face_recognition")
- return "<h1>Event</h1><br>" + msg
- app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
- @app.route('/upload', methods=['GET', 'POST'])
- def upload_file():
- # Upload process
- if request.method == 'POST':
- # Processing the image file
- if 'image_file' in request.files or "doorbell_button" in request.files or "motion_detection" in request.files:
-
- if 'motion_detection' in request.files:
- upload_type = 'motion_detection'
- elif "doorbell_button" in request.files:
- upload_type = 'doorbell_button'
- elif "face_recognition" in request.files:
- upload_type = 'face_recognition'
- else:
- upload_type = 'image_file'
-
- imageFile = request.files[upload_type]
- ext = imageFile.filename.split(".")[-1]
- now = datetime.now()
- new_filename = now.strftime("%Y-%m-%d_%H:%M:%S")
- path = os.path.join(app.config['UPLOAD_FOLDER'], upload_type, new_filename)
- # add original file extension
- if len(ext) > 1:
- path = path + "." + ext
- # Create Upload directory if not exists
- try:
- os.stat(os.path.dirname(path))
- except:
- os.mkdir(os.path.dirname(path))
- imageFile.save(path)
- sendImageNotification(image_path=path, upload_type=upload_type)
- return path
- else:
- return 'Unexpected POST command'
-
- return 'ok'
- # Url response if nothing is posted
- return '''
- <h1>Upload new File</h1>
- <form method="post" enctype="multipart/form-data">
- <input type="file" name="image_file">
- <input type="submit">
- </form>
- '''
- def sendImageNotification(image_path="", upload_type=""):
- file_ok = False
- file_types = [".jpg", ".gif", ".jpeg", ".png"]
- if os.path.isfile(image_path):
- for file_type in file_types:
- if file_type == "." + image_path.split(".")[-1].lower():
- file_ok = True
- break
- if file_ok:
- if upload_type == "doorbell_button":
- if use_telegram: bot.sendPhoto(chat_id=sven_chat_id, photo=open(image_path,'rb'), disable_notification=False)
- if use_ntfy: ntfyAttachmentMessage(msg="Jemand hat geklingelt!", file_path=image_path)
- elif upload_type == "motion_detection":
- if use_telegram: bot.sendPhoto(chat_id=sven_chat_id, photo=open(image_path,'rb'), disable_notification=True)
- if use_ntfy: ntfyAttachmentMessage(msg="Bewegung erkannt.", file_path=image_path)
- elif upload_type == "face_recognition":
- if use_ntfy: ntfyAttachmentMessage(msg="Person erkannt.", file_path=image_path)
- else:
- if use_telegram: bot.sendPhoto(chat_id=sven_chat_id, photo=open(image_path,'rb'), disable_notification=True)
- if use_ntfy: ntfyAttachmentMessage(msg="Neues Foto.", file_path=image_path)
- return True
- return False
- def sendEventNotification(event="", location="door"):
- no_notification = False
- if event == "doorbell_button":
- event_msg = "Türklingel: Jemand wartet"
-
- elif event == "motion_detection":
- event_msg = "Bewegungsmelder: Jemand ist am Hauseingang"
- no_notification = True
- if len(event_msg) > 1:
- if use_telegram: bot.sendMessage(chat_id=sven_chat_id, text=event_msg, disable_notification=no_notification)
- if use_ntfy: ntfyTextMessage(msg=event_msg)
- return event_msg
- return False
- def ntfyAttachmentMessage(msg="", file_path=False, url=False):
- requests.put(ntfy_server + "/" + ntfy_topic,
- data=open(file_path, 'rb'),
- headers={
- "Filename": file_path.split("/")[-1],
- "Authorization": "Basic " + ntfy_auth
- })
- def ntfyTextMessage(msg="", url=False):
- requests.post(ntfy_server + "/" + ntfy_topic,
- data=msg.encode('utf-8'),
- headers={
- "Authorization": "Basic " + ntfy_auth
- })
- @app.route('/')
- def index():
- return("Hello, World")
- if __name__ == "__main__":
- app.run(debug=True, host='0.0.0.0', threaded=True)
|