|
| 1 | +import boto3 |
| 2 | +import os |
| 3 | +from Compiler import Compiler |
| 4 | +import epicbox |
| 5 | +import socketio |
| 6 | +import urllib.request |
| 7 | +from rsmq.consumer import RedisSMQConsumer |
| 8 | +import simplejson as json |
| 9 | +import requests |
| 10 | +from dotenv import load_dotenv |
| 11 | +load_dotenv() |
| 12 | +import ast |
| 13 | + |
| 14 | + |
| 15 | +class Worker: |
| 16 | + # To be initailized in constructor |
| 17 | + compiler = None |
| 18 | + inputs = None |
| 19 | + outputs = None |
| 20 | + untrusted_code = None |
| 21 | + code_lang = None |
| 22 | + redis_consumer = None |
| 23 | + s3 = None |
| 24 | + sio = None |
| 25 | + user_id = None |
| 26 | + submission_id = None |
| 27 | + memorylimit = 256 |
| 28 | + timelimit = 5 |
| 29 | + |
| 30 | + # Will be called when new worker in initialized |
| 31 | + def __init__(self): |
| 32 | + self.compiler = Compiler.getInstance() |
| 33 | + self.s3 = boto3.client( |
| 34 | + 's3', |
| 35 | + aws_access_key_id=os.getenv('AWS_IAM_KEY'), |
| 36 | + aws_secret_access_key=os.getenv('AWS_IAM_SECRET') |
| 37 | + ) |
| 38 | + # connect to socket server |
| 39 | + self.sio = socketio.Client() |
| 40 | + self.sio.connect(os.getenv("APP_SERVER")) |
| 41 | + |
| 42 | + # call this to inititalize This Worker |
| 43 | + def boot(self): |
| 44 | + self.redis_consumer = RedisSMQConsumer( |
| 45 | + 'submissions', self.fetchResourcesAndProcess, host=os.getenv("REDISHOST")) |
| 46 | + self.redis_consumer.run() |
| 47 | + |
| 48 | + # Fetch inputs (Array of links), outputs (Array of links), untrusted_code (link to the code) |
| 49 | + def fetchResourcesAndProcess(self, id, message, rc, ts): |
| 50 | + |
| 51 | + # we get only submission ID from queue |
| 52 | + queuedata = message.split("::") |
| 53 | + print(queuedata) |
| 54 | + |
| 55 | + self.user_id = queuedata[0] |
| 56 | + self.submission_id = queuedata[1] |
| 57 | + |
| 58 | + self.sio.emit("joinroom", self.user_id) |
| 59 | + |
| 60 | + # we request api server |
| 61 | + ques_details_response = requests.get( |
| 62 | + os.getenv("APP_SERVER") + '/worker/testdata/' + self.submission_id) |
| 63 | + ques_data = json.loads(ques_details_response.text) |
| 64 | + |
| 65 | + self.inputs = ques_data['inputs'] |
| 66 | + self.outputs = ques_data['outputs'] |
| 67 | + self.code_lang = ques_data['lang'] |
| 68 | + self.untrusted_code = ques_data['code'] |
| 69 | + self.memorylimit = ques_data['memorylimit'] |
| 70 | + self.timelimit = ques_data['timelimit'] |
| 71 | + |
| 72 | + self.compilerInitialSetup() |
| 73 | + |
| 74 | + if(self.code_lang == "c++"): |
| 75 | + self.processCompileAndRun() |
| 76 | + elif(self.code_lang == "python"): |
| 77 | + self.processDirectRun() |
| 78 | + else: |
| 79 | + self.pingMessage({error: "LR", message: "Language Rejected"}) |
| 80 | + |
| 81 | + self.sio.emit("leaveroom", self.user_id) |
| 82 | + return True |
| 83 | + |
| 84 | + # Perform some initial setup for compiler |
| 85 | + def compilerInitialSetup(self): |
| 86 | + self.compiler.setCompilerLang(self.code_lang) |
| 87 | + self.compiler.setCompilerMemoryLimit(self.memorylimit) |
| 88 | + self.compiler.setCompilerTimeLimit(self.timelimit) |
| 89 | + self.compiler.makeProfiles() |
| 90 | + |
| 91 | + |
| 92 | + # Use this when required to compile first and then execute like in C++ and Java |
| 93 | + def processCompileAndRun(self): |
| 94 | + with epicbox.working_directory() as working_dir: |
| 95 | + self.pingMessage( |
| 96 | + {"success": "CMPL", "message": "Compiling your code"}) |
| 97 | + compile_result = self.compiler.compile( |
| 98 | + working_dir, self.untrusted_code) |
| 99 | + if(compile_result["exit_code"] != 0): |
| 100 | + return self.pingMessage({"error": "CE", "message": compile_result["stderr"]}) |
| 101 | + self.pingMessage( |
| 102 | + {"success": "CMPLS", "message": "Compilation Success"}) |
| 103 | + |
| 104 | + testcase_number = 0 |
| 105 | + for input in self.inputs: |
| 106 | + testcase_number = testcase_number + 1 |
| 107 | + |
| 108 | + self.pingMessage( |
| 109 | + {"success": "RUN", "message": "Running on testcase #" + str(testcase_number)}) |
| 110 | + run_result = self.compiler.run(working_dir, input) |
| 111 | + |
| 112 | + if(run_result["oom_killed"] or run_result["timeout"] or run_result["exit_code"]): |
| 113 | + return self.pingRunError(run_result, testcase_number) |
| 114 | + |
| 115 | + print(run_result) |
| 116 | + eval_result = self.matchOutput( |
| 117 | + run_result["stdout"], self.outputs[testcase_number - 1]) |
| 118 | + if(eval_result == False): |
| 119 | + return self.pingMessage({"error": "WA", "message": "Wrong answer on testcase #" + str(testcase_number)}) |
| 120 | + |
| 121 | + self.pingMessage( |
| 122 | + {"success": "ACS", "message": "Correct on testcase #" + str(testcase_number)}) |
| 123 | + |
| 124 | + return self.pingMessage({"success": "AC", "message": "Accepted Solution"}) |
| 125 | + |
| 126 | + # Use this for languages where we can direct run the code like python |
| 127 | + def processDirectRun(self): |
| 128 | + self.pingMessage( |
| 129 | + {"success": "CMPL", "message": "Evaluating Your Code"}) |
| 130 | + |
| 131 | + testcase_number = 0 |
| 132 | + for input in self.inputs: |
| 133 | + testcase_number = testcase_number + 1 |
| 134 | + |
| 135 | + self.pingMessage( |
| 136 | + {"success": "RUN", "message": "Running on testcase #" + str(testcase_number)}) |
| 137 | + run_result = self.compiler.directRun(self.untrusted_code, input) |
| 138 | + |
| 139 | + if(run_result["oom_killed"] or run_result["timeout"] or run_result["exit_code"]): |
| 140 | + return self.pingRunError(run_result, testcase_number) |
| 141 | + |
| 142 | + print(run_result) |
| 143 | + eval_result = self.matchOutput( |
| 144 | + run_result["stdout"], self.outputs[testcase_number - 1]) |
| 145 | + if(eval_result == False): |
| 146 | + return self.pingMessage({"error": "WA", "message": "Wrong answer on testcase #" + str(testcase_number)}) |
| 147 | + |
| 148 | + self.pingMessage( |
| 149 | + {"success": "ACS", "message": "Correct on testcase #" + str(testcase_number)}) |
| 150 | + |
| 151 | + return self.pingMessage({"success": "AC", "message": "Accepted Solution"}) |
| 152 | + |
| 153 | + def matchOutput(self, user_output, expected_output): |
| 154 | + user_output = user_output.strip() |
| 155 | + expected_output = self.readFile(expected_output).strip() |
| 156 | + if(user_output == expected_output): |
| 157 | + return True |
| 158 | + return False |
| 159 | + |
| 160 | + def pingRunError(self, run_result, testcase_number): |
| 161 | + print(run_result) |
| 162 | + if((run_result["timeout"] and run_result["duration"] > self.timelimit + 1) or run_result["oom_killed"]): |
| 163 | + return self.pingMessage({"error": "MLE", "message": "Memory Limit Exceed on testcase #" + str(testcase_number)}) |
| 164 | + elif(run_result["timeout"]): |
| 165 | + return self.pingMessage({"error": "TLE", "message": "Time Limit Exceed on testcase #" + str(testcase_number)}) |
| 166 | + else: |
| 167 | + return self.pingMessage({"error": "RE", "message": "Run Time Error on testcase #" + str(testcase_number)}) |
| 168 | + |
| 169 | + def pingMessage(self, message): |
| 170 | + message["userId"] = self.user_id |
| 171 | + message["submissionId"] = self.submission_id |
| 172 | + self.sio.emit("status", json.dumps(message)) |
| 173 | + # print(message) |
| 174 | + return True |
| 175 | + |
| 176 | + def readFile(self, file_link): |
| 177 | + data = self.s3.get_object( |
| 178 | + Bucket=os.getenv("AWS_S3_BUCKET"), Key=file_link) |
| 179 | + return data["Body"].read() |
0 commit comments