Nantes Université

Skip to content
Extraits de code Groupes Projets
Valider 10663b9a rédigé par Julien BROCHARD's avatar Julien BROCHARD
Parcourir les fichiers

fix type formatting

parent 1d48bcec
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
%% Cell type:code id:885330be-eafa-426c-b731-92d6346fa284 tags:
``` python
from pyspark.sql import SparkSession
#créer une session dans le master
spark = SparkSession.builder \
.master("spark://172.20.53.96:7077") \
.appName("WDC-complete") \
.config("spark.executor.memory","28g") \
.config("spark.driver.memory","28g") \
.getOrCreate()
# spark = SparkSession.builder.master("local").appName("WDC-complete").getOrCreate()
spark.conf.set("spark.worker.cleanup.enabled",True)
spark.conf.set("spark.worker.cleanup.interval",1800)
spark.conf.set("spark.worker.cleanup.appDataTtl",3600)
spark.conf.set("spark.sql.shuffle.partitions",1000)
#fichiers de config qui permettent de se connecter au serveur de stockage s3 qui contient les fichiers de DataCommons
endpoint_url = 'https://s3.os-bird.glicid.fr/'
aws_access_key_id = 'bbd95ea3c1174caa88345404b84e458f'
aws_secret_access_key = 'eaf2a72ecf9845f583af7f3513c44f25'
hadoopConf = spark._jsc.hadoopConfiguration()
hadoopConf.set('fs.s3a.access.key', aws_access_key_id)
hadoopConf.set('fs.s3a.secret.key', aws_secret_access_key)
hadoopConf.set('fs.s3a.endpoint', endpoint_url)
hadoopConf.set('fs.s3a.path.style.access', 'true')
hadoopConf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
hadoopConf.set('spark.worker.cleanup.enabled', 'true')
hadoopConf.set('fs.s3a.committer.name', 'magic')
```
%% Output
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/14 19:33:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/14 19:33:34 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
%% Cell type:code id:b893de1a-8fa9-4382-b331-8f59c2a16e95 tags:
``` python
from pyspark.sql.functions import col, count
from pyspark.sql import functions as f
from pyspark.sql import Row
import re
from urllib.parse import urlparse
line0 = '_:nb53a244 <http://schema.org/value1> "8,840 ft<> / 2,694 m<>" <https://youtube.com> .'
line1 = '_:nb53a244 <http://schema.org/value2> "8,840 ft<> / 2,694 m<>" <https://youtube.com> .'
line2 = '_:different <http://schema.org/value3> "zfgrredzfzef" <https://youtube.com> .'
line3 = '_:nb53a244 <http://schema.org/value4> "dqqsd" <https://different.com> .'
line4 = '<https://google.com> <http://schema.org/value4> "8,840 ft<> / 2,694 m<>" <https://youtube.com> .'
line5 = '<https://google.com> <http://schema.org/value4> "8,840 ft<> / 2,694 m<>" <https://youtube.com> .'
line6 = '<https://different.com> <http://schema.org/value6> "fsqfqsfdqs" <https://youtube.com> .'
line7 = '<https://google.com> <http://schema.org/type> "Recipe" <https://different.com> .'
line8 = '<https://google.com> <http://schema.org/type> "Recipe"'
line9 = '<https://google.com> <http://schema.org/type>'
line10 = '<https://google.com>'
line11 = ''
line12 = "<https://google.com> <w3.org/1999/02/22-rdf-syntax-ns#type> <schema.org/ListItem> <https://different.com> ."
quad_motif = re.compile(r'([^\s]+)\s([^\s]+)\s(.+)\s([^\s]+)\s+\.')
def parseQ(l, parts):
result = quad_motif.match(l)
if result:
sub = result.group(1).strip()
pred = result.group(2).strip()
pred = re.sub(r'([Hh][Tt][Tt][Pp][Ss]?://)?([Ww]{3}\.)?', '', pred)
if pred == "<w3.org/1999/02/22-rdf-syntax-ns#type>":
pred = "isa:" + result.group(3).strip()
pred = "isa:" + re.sub(r'([Hh][Tt][Tt][Pp][Ss]?://)?([Ww]{3}\.)?', '', result.group(3).strip())
hashstring = sub
if sub.startswith("_:"):
#sub += result.group(4).strip()
hashstring += result.group(4).strip().strip("<>")
# print(hashstring)
return Row(subject=sub, predicate=pred, hashdom=hash(hashstring) % parts)
else:
# Handle non-matching lines
print(f"parsing error : {l}")
return None
print(parseQ(line0, 100))
print(parseQ(line1, 100))
print(parseQ(line2, 100))
print(parseQ(line3, 100))
print(parseQ(line4, 100))
print(parseQ(line5, 100))
print(parseQ(line6, 100))
print(parseQ(line7, 100))
print(parseQ(line8, 100))
print(parseQ(line9, 100))
print(parseQ(line10, 100))
print(parseQ(line11, 100))
print(parseQ(line12, 100))
```
%% Output
Row(subject='_:nb53a24408607424384c1357880ce1bc7xb1', predicate='<http://schema.org/value>', hashdom=82)
Row(subject='<https://172-20-53-96.os-bird.glicid.fr/ageregaezg>', predicate='<http://schema.org/value>', hashdom=9)
Row(subject='<https://172-20-53-96.os-bird.glicid.fr/fgdqvq>', predicate='<http://schema.org/value>', hashdom=3)
Row(subject='<https://172-20-53-96.os-bird.glicid.fr/fgdqvq>', predicate='<http://schema.org/value>', hashdom=3)
Row(subject='_:nb53a24408607424384c1357880ce1bc7xb1', predicate='<http://schema.org/value>', hashdom=82)
Row(subject='_:geafazef4384c1357880ce1bc7xb1', predicate='<http://schema.org/value>', hashdom=32)
%% Cell type:code id:a22a0e88-774a-4136-b07f-422954618be5 tags:
``` python
def partCS(hashdom_val, hashdom_max, output=None):
print(f"part {hashdom_val+1}/{hashdom_max} started");
file_name = f"hashdom{hashdom_val+1}-{hashdom_max}"
cset2 = spark.sql(f"select subject, concat_ws(' ',sort_array(collect_set(predicate))) as pset FROM Super where hashdom={hashdom_val} group by subject ").cache()
#cset2.show(truncate=200)
#print(cset2.count())
result2 = cset2.groupby("pset").agg(f.count(cset2.subject).alias('count'))
result2.show(truncate=0)
if(output is not None):
print("Saving")
result2.write.option("header",True) \
.mode("overwrite") \
.csv(f"s3a://test-out/{output}/{file_name}")
# clear variables from memory
cset2.unpersist()
result2.unpersist()
print(f"part {hashdom_val+1}/{hashdom_max} finished");
del cset2, result2
def completeCS(input, parts, output=None):
lines = spark.sparkContext.textFile(f"s3a://{input}/**")
sp = lines.map(lambda l: parseQ(l, parts)).filter(lambda result: result is not None).toDF()
sp.createOrReplaceTempView("Super")
for i in range(parts):
partCS(i, parts, output)
print("Finished")
```
%% Cell type:code id:76cf2537-9906-4157-891e-701663c6ba3c tags:
``` python
# input : test or wdc or wdc-2023
# no output = no save
completeCS("wdc-2023", 20, "cset-wdc-2023-fix2")
```
......
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Veuillez vous inscrire ou vous pour commenter