import sys
from Bio import SeqIO
from Bio.PDB.MMCIFParser import MMCIFParser
from Bio.PDB.MMCIFIO import MMCIFIO
from Bio.PDB.Polypeptide import is_aa, three_to_one
from collections import defaultdict
def parse_fasta(fasta_file):
"""FASTAファイルを解析してシーケンスリストを取得"""
sequences = []
for record in SeqIO.parse(fasta_file, "fasta"):
sequences.append(str(record.seq))
return sequences
def parse_cif(cif_file):
"""CIFファイルを解析して構造オブジェクトを取得"""
parser = MMCIFParser(QUIET=True)
structure = parser.get_structure('modelangelo', cif_file)
return structure
def get_chain_info(structure, num_groups):
"""
各チェインの情報(シーケンス、長さ、残基番号、グループ)を取得
グループはFASTAファイルのシーケンス順にA, B, C,...と割り当て
"""
chain_info = {}
for model in structure:
for chain in model:
chain_id = chain.get_id()
# グループキーを取得(例: 'A' from 'Aa', 'B' from 'Bb')
if len(chain_id) < 1:
continue # 無効なチェインIDをスキップ
group_key = chain_id[0].upper()
group_index = ord(group_key) - ord('A')
if group_index < 0 or group_index >= num_groups:
print(f"警告: チェインID {chain_id} のグループキー {group_key} がFASTAシーケンス数を超えています。スキップします。")
continue
sequence = ""
res_nums = []
for res in chain:
if is_aa(res, standard=True):
try:
res_name = res.get_resname()
aa = three_to_one(res_name)
sequence += aa
res_nums.append(res.get_id()[1])
except KeyError:
# 不明なアミノ酸残基をスキップ
pass
if sequence:
chain_info[chain_id] = {
'sequence': sequence,
'length': len(sequence),
'res_nums': res_nums,
'start': min(res_nums) if res_nums else None,
'end': max(res_nums) if res_nums else None,
'group': group_key
}
return chain_info
def filter_chains(chain_info, min_length=10):
"""短いチェインの削除"""
filtered = {cid: info for cid, info in chain_info.items() if info['length'] > min_length}
return filtered
def group_chains(filtered_chains):
"""チェインをグループ化(グループキーごとにリストに分ける)"""
groups = defaultdict(list)
for cid, info in filtered_chains.items():
groups[info['group']].append((cid, info))
return groups
def remove_overlapping_chains(groups):
"""
各グループ内で重複するチェインを削除し、最も長いチェインを保持
"""
final_chains = {}
for group, chains in groups.items():
# チェインを長さの降順でソート
chains_sorted = sorted(chains, key=lambda x: x[1]['length'], reverse=True)
kept = []
for cid, info in chains_sorted:
overlap = False
for kept_cid, kept_info in kept:
# 残基番号の範囲が重複するかチェック
if info['start'] <= kept_info['end'] and info['end'] >= kept_info['start']:
overlap = True
break
if not overlap:
kept.append((cid, info))
# グループごとに保持するチェインリストを追加
final_chains[group] = [cid for cid, _ in kept]
return final_chains
def merge_chains(structure, final_chains, group_keys):
"""
各グループの保持されたチェインを1つにマージ
新しいチェインIDはグループキー(例: 'A', 'B', ...)
"""
from Bio.PDB import Model, Chain, Residue, Atom
# 新しいモデルを作成
new_structure = Model.Model(0)
for group_key in group_keys:
chains = final_chains.get(group_key, [])
if not chains:
continue # このグループにチェインがない場合
# 新しいチェインを作成
new_chain = Chain.Chain(group_key)
residue_ids = set()
for cid in chains:
chain = structure[0][cid]
for res in chain:
res_id = res.get_id()
# 重複を避けるために残基IDをユニークにする
if res_id in residue_ids:
continue
residue_ids.add(res_id)
new_chain.add(res.copy())
# 追加されたチェインが有機な長さか確認
if len(new_chain) > 10:
new_structure.add(new_chain)
else:
print(f"グループ {group_key} のマージされたチェインが10残基以下のため、削除されました。")
return new_structure
def write_clean_cif(original_structure, merged_structure, output_file):
"""マージされたチェインのみを含む新しいCIFファイルを作成"""
# 新しい構造を作成
from Bio.PDB import Structure
new_structure_full = Structure.Structure('merged_modelangelo')
new_model = merged_structure
new_structure_full.add(new_model)
io = MMCIFIO()
io.set_structure(new_structure_full)
io.save(output_file)
print(f"クリーンアップされたCIFファイルが {output_file} に保存されました。")
def main():
if len(sys.argv) != 4:
print("使用法: python merge_modelangelo_chains.py input.cif input_sequences.fasta output_cleaned.cif")
sys.exit(1)
input_cif = sys.argv[1]
input_fasta = sys.argv[2]
output_cif = sys.argv[3]
print("FASTAファイルを解析中...")
sequences = parse_fasta(input_fasta)
num_groups = len(sequences)
print(f"FASTAシーケンス数: {num_groups}")
print("CIFファイルを解析中...")
structure = parse_cif(input_cif)
print("チェイン情報を取得中...")
chain_info = get_chain_info(structure, num_groups)
print(f"総チェイン数(フィルタ前): {len(chain_info)}")
print("短いチェインをフィルタリング中...")
filtered_chains = filter_chains(chain_info, min_length=10)
print(f"チェイン数(長さ > 10残基): {len(filtered_chains)}")
print("チェインをグループ化中...")
groups = group_chains(filtered_chains)
for group, chains in groups.items():
print(f"グループ {group}: {', '.join([cid for cid in chains])}")
print("重複するチェインを削除中...")
final_chains = remove_overlapping_chains(groups)
for group, chains in final_chains.items():
print(f"グループ {group} に保持されたチェイン: {', '.join(chains)}")
print("チェインをマージ中...")
merged_structure = merge_chains(structure, final_chains, final_chains.keys())
print("クリーンアップされたCIFファイルを作成中...")
write_clean_cif(structure, merged_structure, output_cif)
print("フィルタリングとマージが完了しました。")
if __name__ == "__main__":
main()
コメント